Complexity analysis – Big o notation table

Searching

Algorithm Data Structure Time Complexity Space Complexity
Average Worst Worst
Depth First Search (DFS) Graph of |V| vertices and |E| edges - O(|E| + |V|) O(|V|)
Breadth First Search (BFS) Graph of |V| vertices and |E| edges - O(|E| + |V|) O(|V|)
Binary search Sorted array of n elements O(log(n)) O(log(n)) O(1)
Linear (Brute Force) Array O(n) O(n) O(1)
Shortest path by Dijkstra,
using a Min-heap as priority queue
Graph with |V| vertices and |E| edges O((|V| + |E|) log |V|) O((|V| + |E|) log |V|) O(|V|)
Shortest path by Dijkstra,
using an unsorted array as priority queue
Graph with |V| vertices and |E| edges O(|V|^2) O(|V|^2) O(|V|)
Shortest path by Bellman-Ford Graph with |V| vertices and |E| edges O(|V||E|) O(|V||E|) O(|V|)

Sorting

Algorithm Data Structure Time Complexity Worst Case Auxiliary Space Complexity
Best Average Worst Worst
Quicksort Array O(n log(n)) O(n log(n)) O(n^2) O(log(n))
Mergesort Array O(n log(n)) O(n log(n)) O(n log(n)) O(n)
Heapsort Array O(n log(n)) O(n log(n)) O(n log(n)) O(1)
Bubble Sort Array O(n) O(n^2) O(n^2) O(1)
Insertion Sort Array O(n) O(n^2) O(n^2) O(1)
Select Sort Array O(n^2) O(n^2) O(n^2) O(1)
Bucket Sort Array O(n+k) O(n+k) O(n^2) O(nk)
Radix Sort Array O(nk) O(nk) O(nk) O(n+k)

Data Structures

Data Structure Time Complexity Space Complexity
Average Worst Worst
Indexing Search Insertion Deletion Indexing Search Insertion Deletion
Basic Array O(1) O(n) - - O(1) O(n) - - O(n)
Dynamic Array O(1) O(n) O(n) - O(1) O(n) O(n) - O(n)
Singly-Linked List O(n) O(n) O(1) O(1) O(n) O(n) O(1) O(1) O(n)
Doubly-Linked List O(n) O(n) O(1) O(1) O(n) O(n) O(1) O(1) O(n)
Skip List O(n) O(log(n)) O(log(n)) O(log(n)) O(n) O(n) O(n) O(n) O(n log(n))
Hash Table - O(1) O(1) O(1) - O(n) O(n) O(n) O(n)
Binary Search Tree - O(log(n)) O(log(n)) O(log(n)) - O(n) O(n) O(n) O(n)
B-Tree - O(log(n)) O(log(n)) O(log(n)) - O(log(n)) O(log(n)) O(log(n)) O(n)
Red-Black Tree - O(log(n)) O(log(n)) O(log(n)) - O(log(n)) O(log(n)) O(log(n)) O(n)
AVL Tree - O(log(n)) O(log(n)) O(log(n)) - O(log(n)) O(log(n)) O(log(n)) O(n)

Heaps

Heaps Time Complexity
Heapify Find Max Extract Max Increase Key Insert Delete Merge
Linked List (sorted) - O(1) O(1) O(n) O(n) O(1) O(m+n)
Linked List (unsorted) - O(n) O(n) O(1) O(1) O(1) O(1)
Binary Heap O(log(n)) O(1) O(log(n)) O(log(n)) O(log(n)) O(log(n)) O(m+n)
Binomial Heap - O(log(n)) O(log(n)) O(log(n)) O(log(n)) O(log(n)) O(log(n))
Fibonacci Heap - O(1) O(log(n))* O(1)* O(1) O(log(n))* O(1)

Graphs

Node / Edge Management Storage Add Vertex Add Edge Remove Vertex Remove Edge Query
Adjacency list O(|V|+|E|) O(1) O(1) O(|V| + |E|) O(|E|) O(|V|)
Incidence list O(|V|+|E|) O(1) O(1) O(|E|) O(|E|) O(|E|)
Adjacency matrix O(|V|^2) O(|V|^2) O(1) O(|V|^2) O(1) O(1)
Incidence matrix O(|V| ⋅ |E|) O(|V| ⋅ |E|) O(|V| ⋅ |E|) O(|V| ⋅ |E|) O(|V| ⋅ |E|) O(|E|)

Reference :

http://sandbox.runjs.cn/show/vsr4wsy7

Advertisements

Re-partitioning & partition in spark

  In Hadoop, partitioning a data allows processing of huge volume of data in parallel such that it takes minimum amount of time to process entire dataset. Apache spark decides partitioning based on different factors. Factor that decide default partitioning

  1. On hadoop split by HDFS cores.
  2. Filter or map function don’t change partitioning
  3. Number of cpu cores in cluster when running on non-hadoop mode.

Re-partitioning : increases partition , it re-balance the partition  after filter &it increases parallelism.

 You can define partition in spark at the time of creating RDD as follow :

val users = sc.textFile(“hdfs://at-r3p11:8020/project/users.csv”,1);

where 2nd argument is nothing but number of partition.

By default if not used hdfs spark creates partition based on number of cores. & if used hdfs path it will create partition based on input split (default block size of hdfs).

To know the partition size , just enter in spark-shell

users.partitions.size

Spark can only run 1 concurrent task for every partition of an RDD, up to the number of cores in your cluster. So if you have a cluster with 50 cores, you want your RDDs to at least have 50 partitions (and probably 2-3x times that).

As far as choosing a “good” number of partitions, you generally want at least as many as the number of executors for parallelism. You can get this computed value by calling sc.defaultParallelism.

Also, the number of partitions determines how many files get generated by actions that save RDDs to files.

The maximum size of a partition is ultimately limited by the available memory of an executor.

In the first RDD transformation, e.g. reading from a file using sc.textFile(path, partition), thepartition parameter will be applied to all further transformations and actions on this RDD.

When using textFile with compressed files (file.txt.gz not file.txt or similar), Spark disables splitting that makes for an RDD with only 1 partition (as reads against gzipped files cannot be parallelized). In this case, to change the number of partitions you should do repartitioning.

Some operations, e.g. map, flatMap, filter, don’t preserve partitioning.

map, flatMap, filter operations apply a function to every partition.

rdd = sc.textFile('demo.gz')
rdd = rdd.repartition(100)

With the lines, you end up with rdd to be exactly 100 partitions of roughly equal in size.

  • rdd.repartition(N) does a shuffle to split data to match N

  • partitioning is done on round robin basis

Note :
If partitioning scheme doesn’t work for you, you can write your own custom partitioner.

coalesce Transformation :

The coalesce transformation is used to change the number of partitions. It can trigger RDD shufflingdepending on the shuffle flag (disabled by default, i.e. false).

In the following sample, you parallelize a local 10-number sequence and coalesce it first without and then with shuffling (note the shuffle parameter being false and true, respectively).
scala> val rdd = sc.parallelize(0 to 10, 8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24

scala> rdd.partitions.size
res0: Int = 8

scala> rdd.coalesce(numPartitions=8, shuffle=false)   (1)
res1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at :27

  1. shuffle is false by default and it’s explicitly used here for demo purposes. Note the number of partitions that remains the same as the number of partitions in the source RDD rdd.

Asynchronous processing in java

Asynchronous programming is very popular these days, primarily because of its ability to improve the overall throughput on a multi-core system. Asynchronous programming is a programming paradigm that facilitates fast and responsive user interfaces. The asynchronous programming model in Java provides a consistent programming model to write programs that support asynchrony.

Asynchronous programming provides a non-blocking, event-driven programming model. This programming model leverages the multiple cores in your system to provide parallelization by using multiple CPU cores to execute the tasks, thus increasing the application’s throughput. Note that throughput is a measure of the amount of work done in unit time. In this programming paradigm, a unit of work would execute separately from the main application thread and notify the calling thread about its execution state: success, in progress or failure.

Application of asynchronous can be a situation where we want to execute multiple things in parellel without waiting for 1 task to finish such that it increase the throughput of the system. Consider we want to send email to 100k+ users and at the same time need to process other data, such that we don’t want to wait for email task to complete to proceed.

Another good example of this can be logging frameworks: You typically would want to log exceptions and errors into your log targets; in other words, file, database, or something similar. There is no point for your application to wait till the logging tasks are over. In doing so, the application’s responsiveness would be affected. On the contrary, if the call to the logging framework can be made asynchronously, the application can proceed with other tasks concurrently, without having to wait. This is an example of a non-blocking mode of execution.

1. Future is a base interface and defines abstraction of an object which promises result to be available in future while FutureTask is an implementation of the Future interface.

2. Future is a parametric interface and type-safe written as Future<V>, where V denotes value.

3. Future provides get() method to get result, which is blocking method and blocks until result is available to Future.

4. Future interface also defines cancel() method to cancel task.

5. isDone() and isCancelled() method is used to query Future task states. isDone() returns true if task is completed and result is available to Future. If you call get() method, after isDone() returned true then it should return immediately. On the other hand, isCancelled() method returns true, if this task is cancelled before its completion.

6. Future has four sub interfaces, each with additional functionality e.g. Response, RunnableFuture, RunnableScheduledFuture and ScheduledFuture. RunnableFuture also implements Runnable and successful finish of run() method cause completion of this Future.

7. FutureTask and SwingWorker are two well known implementation of Future interface. FutureTask also implements RunnableFuture interface, which means this can be used as Runnable and can be submitted to ExecutorService for execution.

8. Though most of the time ExecutorService creates FutureTask for you, i.e. when you submit() Callable or Runnable object. You can also created it manually.

9. FutureTask is normally used to wrap Runnable or Callable object and submit them to ExecutorService for asynchronous execution.

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger; /** * Java program to show how to use Future in Java. Future allows to write * asynchronous code in Java, where Future promises result to be available in * future * * @author Javin */
public class FutureDemo {
private static final ExecutorService threadpool = Executors.newFixedThreadPool(2);
public static void main(String args[]) throws InterruptedException, ExecutionException {
FactorialCalculator task = new FactorialCalculator(1000);

System.out.println(“Submitting Task …”);
Future future = threadpool.submit(task);
System.out.println(“Task is submitted”);
while (!future.isDone()) {
System.out.println(“Task is not completed yet….”);
Thread.sleep(1); //sleep for 1 millisecond before checking again
}
System.out.println(“Task is completed, let’s check result”);
long factorial = (long) future.get();
System.out.println(“Factorial of 1000000 is : ” + factorial);
threadpool.shutdown();
}
private static class FactorialCalculator implements Callable {
private final int number;
public FactorialCalculator(int number) {
this.number = number;
}

@Override public Long call() {
long output = 0;
try {
output = factorial(number);
} catch (InterruptedException ex) {
//Logger.getLogger(Test.class.getName()).log(Level.SEVERE, null, ex);
}
return output;
}

private long factorial(int number) throws InterruptedException {
if (number < 0) {
throw new IllegalArgumentException(“Number must be greater than zero”);
}
long result = 1;
while (number > 0) {
Thread.sleep(1); // adding delay for example
result = result * number;
number–;
}
return result;

}
}
}

Usage in spring framework is given in below link :

 

Demystifying Asynchronous Actions in Spark

Knoldus

What if we want to execute 2 actions concurrently on different RDD’s, Spark actions are always synchronous. Like if we perform two actions one after other they always execute in sequentially like one after other.

Let see example

In the above exmaple 2 actions are perform one after other collect and count, both are execute synchronous. So count will always execute after collect will finish. The out of the above code is as follows

Screenshot from 2015-10-21 12:36:04

Now question is if we want to run spark jobs concurrently in async fashion.

So for above question answer is simple apache spark also provide a asyn action for concurrent execution of jobs, Few Asynchronous actions spark provide as follows

collectAsync() -> Returns a future for retrieving all elements of this RDD.
countAsync() -> Returns a future for counting the number of elements in the RDD.
foreachAsync(scala.Function1<T,scala.runtime.BoxedUnit> f) -> Applies a function f to all elements…

View original post 282 more words

Apache Oozie – A Scheduling System

Introduction

In batch processing system, we have to schedule the jobs which runs periodically. This creates a lots of overhead in deployment & maintenance of system.  As a solution to this, Oozie provides workflows in xml format using which we can define multiple Map/Reduce jobs into a logical unit of work, accomplishing the larger task [4].

Workflows work perfectly when invoked on demand or manually. But for achieving higher level of automation and effectiveness, it becomes necessary to run them based on one or more of the following parameters: regular time intervals, data availability or external events. Then, we need more functionality than provided by Oozie workflows.

In this paper, Oozie Coordinator Jobs will be discussed which provide options to embed workflows and trigger them on regular time-intervals or on basis of data availability.

The Oozie coordinator allows expressing conditions to trigger execution of workflow in the form of the predicates [1]. These predicates are conditional statements on parameters like time, data and external events. If the predicate is satisfied, then only the workflow job/action is started.

Oozie Coordinator System

As stated at Oozie documentation page [1], “Oozie is a Java Web-Application that runs in a Java servlet-container”. It uses XML for taking configuration inputs from user and uses a database (default is derby but MySQL, HSQLDB or any RDBMS database can also be used) to store:

 

  • Definitions of Workflow and Coordinator
  • Currently running workflow and Coordinator instances, including instance states, configuration variables and parameters.

Oozie Coordinator is a collection of predicates (conditional statements based on time-frequency and data availability) and actions (i.e. Hadoop Map/Reduce jobs, Hadoop file system, Hadoop Streaming, Pig, Java and Oozie sub-workflow). Actions are recurrent workflow jobs invoked each time predicate returns true.

Oozie version 2 and higher supports Coordinator Jobs. Coordinator Job is defined in the XML Process Definition Language.

Predicates are conditional statements, defined using attributes “interval, start-time and end-time” for time-based triggering and xml-tags “dataset and input-events” for data-availability based triggering of workflows.
Actions are the mechanism by which a workflow is triggered for the execution of a computation/processing task. Action contains description of one or more workflows to be executed.

Oozie is lightweight as it uses existing Hadoop Map/Reduce framework for executing all tasks in a workflow. This approach allows it to leverage existing Hadoop installation for providing scalability, reliability, parallelism, etc.
On the basis of functionality, Coordinator can be sub-divided into two major groups [2]:

1. Time-Based Coordinator: This type of Coordinator definition is used for invoking the workflow repeatedly after an interval between a specified period of time.

2.File-Based Coordinator: This type of Coordinator definition is used for invoking the workflow on the basis of data availability and data polling.

2.1  Simple File-Based Coordinator: The action is invoked whenever data available predicate is true.

2.2 Sliding Window-Based Coordinator:  It is invoked frequently and data is aggregated over multiple overlapping previous instances. For example, invoking it at a frequency of 5 minutes and running action on aggregated previous 4 instances of 15 minutes data.

2.3Rollups-Based Coordinator: It is invoked after a long period of time and data is aggregated over multiple previous instances from last time of invocation. For example, it will run once a day, and will trigger a workflow that aggregates 24 instances of hourly data.

Oozie Coordinator Components and Variables

  • Coordinator-App: It is a wrapper component that defines the attributes of a coordinator and includes all other components.

Attributes are:

  • start , end :  describes the start and end time in yyyy-mm-ddThh:mmZ format 
  • Time zone: describes the time zone (is the value of Z in the above time format) like UTC. 
  • Controls: It contains parameters like timeout, concurrency, etc. to configure the execution of coordinator job.
  • Datasets: It contains the definition of multiple data sources and frequency of data polling.

Attributes are:

  • Frequency: interval of time at which data polling is done.
  • Initial-Instance: start time of data polling in yyyy-mm-ddThh:mmZ format.
  • Uri-Template: URI of the data source. Expression language can be used. For example, ${YEAR} corresponds to current year. It helps in dynamic selection of data source directories.
  • Done-flag: This flag denotes the success of data polling. It can be a file in which case the presence of file is checked before calling action. It can be left empty otherwise for implicit success message.
  • Input-Events:  denotes the processing of the input data before running the action.
  • Data-in: it denotes the aggregated output data of input-event.
  • Start-instance and end-instance: boundary of data instances that needs to be aggregated.
  • Output-Events:  denotes the processing of the output data after running the action.
  • Data-out: it denotes the output dataset.
  • Instance:  instance of dataset that is to be used as sink for output.
  • Action: It includes the path of the workflow that has to be invoked when predicate return true.

It could also be configured to record the events required to evaluate SLA compliance.

Oozie Coordinator Lifecycle Operations

 

The lifecycle operations of coordinator are similar to those of oozie workflow except start operation. “Start” is not applicable for coordinators.

  • Submit/Run: Both operations submit the coordinator job to oozie. The job will be in PREP state till the mentioned start-time of the coordinator. 
  • Suspend: Suspends/pause the coordinator job. 
  • Resume: Resumes the execution of the coordinator job. 
  • Kill: kill the coordinator job and ends its execution. 
  • reRun: re-submitting the coordinator job/actions with new parameters. 

Oozie Coordinator Example   

In this section, we will see how to use oozie coordinator for scheduling and triggering of the workflows.

 

  • A Sample Workflow: First of all, we need a oozie workflow job. For example purpose, I have taken the simple wordcount example provided by Apache-Hadoop-Distribution in hadoop-examples-0.20.2-cdh3u0.jar [6].

The workflow for wordcount is:

<workflow-app xmlns=’uri:oozie:workflow:0.1′ name=’java-main-wf’>
<start to=’mapreduce-wordcount-example’ />
<action name=’mapreduce-wordcount-example’>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>org.apache.hadoop.examples.ExampleDriver</main-class>
<arg>wordcount</arg>
<arg>${inputDir}</arg>
<arg>${outputDir}</arg>
</java>
<ok to=”end” />
<error to=”fail” />
</action>
<kill name=”fail”>
<message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name=’end’ />
</workflow-app>

Once workflow is created it has to be deployed correctly. A typical Oozie deployment is a HDFS directory, containing workflow.xml and a lib subdirectory, containing jar files of classes used by workflow actions.
For example, the directory structure in hadoop will be as shown below. (If user.name is training)

[training@localhost ~]$ hadoop dfs -ls /user/training/oozie/workflow/wordcount
Found 2 items
drwxr-xr-x   – training supergroup          0 2012-09-18 12:05 /user/training/oozie/workflow/wordcount/lib
-rw-r–r–   1 training supergroup        918 2012-09-18 11:47 /user/training/oozie/workflow/wordcount/workflow.xml

The job.properties file will have following properties:

nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
inputDir=${nameNode}/data.in
outputDir=${nameNode}/out

user.name=training

oozie.wf.application.path=${nameNode}/user/${user.name}/oozie/workflow/wordcount/

With job properties in place, this workflow can be invoked manually using the oozie workflows submit command from command-line.

[training@localhost Desktop]$ oozie job -oozie=http://localhost:11000/oozie/ -config oozie/wordcount-demo/workflow/job.properties -run;

job: 0000000-120918134457517-oozie-oozi-W

2. Oozie Coordinator Definition: As discussed above, coordinator-definitions will be different for different kind of triggering and scheduling.

So, we will take each kind of Coordinator one by one and schedule wordcount example on the basis of that.
Moreover, Oozie coordinators can be parameterized using variables like ${inputDir}, ${startTime}, etc. within the coordinator definition. When submitting a coordinator job, values for the parameters must be provided as input. As parameters are key-value pairs, they can be written in a job.properties file or a XML file. Parameters can also be provided in form of a java Map object if using JAVA API to invoke a coordinator job.

  • Time-Based Coordinator

The generic definition for this kind of coordinator is

<coordinator-app name=”coordinator1″ frequency=”${frequency}” start=”${startTime}” end=”${endTime}” timezone=”${timezone}” xmlns=”uri:oozie:coordinator:0.1″>
<action>
<workflow>
<app-path>${workflowPath}</app-path>
</workflow>
</action>
</coordinator-app>

Save the file as coordinator.xml in a HDFS directory. (Please note that coordinator.xml is the only name which can be given to the file as oozie uses this default name for reading file in HDFS directory.)

The coordinatorjob.properties can be defined as
frequency=60
startTime=2012-08-31T20\:20Z
endTime=2013-08-31T20\:20Z
timezone=GMT+0530

workflowPath=${nameNode}/user/${user.name}/oozie/workflow/wordcount/

nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
inputDir=${nameNode}/data.in
outputDir=${nameNode}/out

oozie.coord.application.path=${nameNode}/user/${user.name}/coordOozie/coordinatorTimrBased

The coordinator application path must be specified in the file with the oozie.coord.application.path property. Specified path must be an HDFS path.

  • File-Based Coordinator
<coordinator-app name=”coordinator1″ frequency=”${frequency}” start=”${startTime}” end=”${endTime}” timezone=”UTC” xmlns=”uri:oozie:coordinator:0.1″>
<datasets>
<dataset name=”input1″ frequency=”${datasetfrequency}” initial-instance=”${datasetinitialinstance}”
timezone=”${datasettimezone}”>
<uri-template>${dataseturitemplate}/${YEAR}/${MONTH}/${DAY}/${HOUR}/
${MINUTE}</uri-template>
<done-flag> </done-flag>
</dataset>
</datasets>
<input-events>
<data-in name=”coordInput1″ dataset=”input1″>
<start-instance>${inputeventstartinstance}</start-instance>
<end-instance>${inputeventendinstance}</end-instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${workflowPath}</app-path>
</workflow>
</action>
</coordinator-app>

Save the file as coordinator.xml in a HDFS directory. (Please note that coordinator.xml is the only name which can be given to the file as oozie uses this default name for reading file in HDFS directory.)

The coordinatorjob.properties can be defined as

frequency=60
startTime=2012-08-21T15:25Z
endTime=2012-08-22T15:25Z
timezone=UTC
datasetfrequency=15
datasetinitialinstance=2012-08-21T15:30Z
datasettimezone=UTC
dataseturitemplate=${namenode}/user/hadoop/oozie/coordinator/in
inputeventstartinstance=${coord:current(0)}
inputeventendinstance=${coord:current(0)}

workflowPath=${nameNode}/user/${user.name}/oozie/workflow/wordcount/

nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
inputDir= ${coord:dataIn(‘coordInput1’)}
outputDir=${nameNode}/out

oozie.coord.application.path=${nameNode}/user/${user.name}/coordOozie/coordinatorFileBased

The coordinator application path must be specified in the file with the oozie.coord.application.path property. Specified path must be an HDFS path.

  • Sliding-Window Based Coordinator

This is a specific usecase for the File-Based Coordinator where coordinator is invoked frequently and data is aggregated over multiple overlapping previous instances.
The rule for this can be generalized as
Coordinator-frequency < DataSet-Frequency

For example, the coordinator job.properties will be like

frequency=5

datasetfrequency=15
……
  • Rollups Based Coordinator

This is a specific usecase for the File-Based Coordinator where coordinator is invoked after a long period of time and data is aggregated over multiple previous instances from last time of invocation. 

The rule for this can be generalized as
Coordinator-frequency > DataSet-Frequency

frequency=1440
….
datasetfrequency=60
…….

Running Coordinator Example from Command line

  • Submitting/Running the coordinator job
$ oozie job -oozie http://localhost:11000/oozie -config coordinatorjob.properties [-submit][-run]
job: 0000672-120823182447665-oozie-hado-C

The parameters for the job must be provided in a file, either a Java Properties file (.properties) or a Hadoop XML Configuration file (.xml). This file must be specified with the -config option.

  • Suspending the coordinator job
$ oozie job -oozie http://localhost:11000/oozie -suspend 0000673-120823182447665-oozie-hado-C
  • Resuming a Coordinator Job
$ oozie job -oozie http://localhost:11000/oozie -resume 0000673-120823182447665-oozie-hado-C
  • Killing a Coordinator Job
$ oozie job -oozie http://localhost:11000/oozie -kill 0000673-120823182447665-oozie-hado-C
  • Rerunning a Coordinator Action or Multiple Actions
$ oozie job -rerun 0000673-120823182447665-oozie-hado-C [-nocleanup]
[-refresh][-action 1,3-5] [-date 2012-01-01T01:00Z::2012-05-31T23:59Z, 2012-11-10T01:00Z, 2012-12-31T22:00Z]

-action or -date is required to rerun. If neither -action nor -date is given, the exception will be thrown.

  • Checking the Status of a Coordinator/Workflow job or a Coordinator Action

$ oozie job -oozie http://localhost:11000/oozie -info 0000673-20823182447665-oozie-hado-C

The info option can display information about a workflow job or coordinator job or coordinator action.

Invoking Coordinator Jobs from Java Client

The Oozie has exposed a JAVA API for invoking and controlling the workflows programmatically. Same API is also made applicable for coordinator but with some changes as coordinator and workflow differ in functioning.

 

 //The service for executing coordinators on oozie
   public class CoordinatorOozieService
{
// Oozie Client
OozieClient oozieClient = null;

public CoordinatorOozieService(String url){
oozieClient = new OozieClient(url);
}

//To submit the coordinator job on oozie
public String submitJob(String jobPropertyFilePath) throws OozieClientException, IOException{

// create an empty coordinator job configuration object
//with just the USER_NAME set to the JVM user name
Properties conf = oozieClient.createConfiguration();

conf.setProperty(“user.name”, “training”);

//set the coordinator properties
conf.load(new FileInputStream(jobPropertyFilePath));

// submit the coordinator job
return oozieClient.submit(conf);
}

//To submit the coordinator job on oozie
public String submitJob(Properties workflowProperties) throws OozieClientException, IOException{

// create an empty coordinator job configuration object
//with just the USER_NAME set to the JVM user name
Properties conf = oozieClient.createConfiguration();

//set the coordinator properties
conf.putAll(workflowProperties);

conf.setProperty(“user.name”, “training”);

// submit the coordinator job
return oozieClient.submit(conf);
}

// To run (submit and start) the coordinator job on oozie
public String runJob(String jobPropertyFilePath) throws OozieClientException, IOException{

// create an empty coordinator job configuration object
//with just the USER_NAME set to the JVM user name

Properties conf = oozieClient.createConfiguration();

conf.setProperty(“user.name”, “training”);

//set the coordinator properties
conf.load(new FileInputStream(jobPropertyFilePath));

// submit and start the coordinator job
return oozieClient.run(conf);
}

// To suspend the coordinator job on oozie
public void suspendJob(String jobId) throws OozieClientException {

// start the coordinator job
oozieClient.suspend(jobId);
}

// To resume the coordinator job on oozie
public void resumeJob(String jobId) throws OozieClientException {

// start the coordinator job
oozieClient.resume(jobId);
}

//To kill the coordinator job on oozie
*/
public void killJob(String jobId) throws OozieClientException {

// start the coordinator job
oozieClient.kill(jobId);
}

//To get the status of the Coordinator Job with id <jobID>
public Status getJobStatus(String jobID) throws OozieClientException{
CoordinatorJob job = oozieClient.getCoordJobInfo(jobID);
return job.getStatus();
}
}

 

Conclusion

The Oozie Coordinator can be used for efficient scheduling of the Hadoop-related workflows. It also helps in triggering the same on the basis of availability of the data or external events. Moreover, it provides lot of configurable and pluggable components which helps in easy and effective deployment and maintenance of the Oozie workflow jobs.
As the coordinator is specified in XML, it is easy to integrate it with the J2EE applications. Invoking of coordinator jobs through java has already been explained above.

Enhancements

Oozie provides a new component, “Bundle” in its latest version 3. It provides a higher-level abstraction in which it creates a set of coordinator applications often called a Data Pipeline. Data Dependency can be inserted between multiple coordinator jobs to create an implicit data application pipeline. Oozie Lifecycle operations (start/stop/suspend/resume/rerun) can also be applied at the bundle level which  results in a better and easy operational control.

How to generate graphs in python

Friends, Python is a language which is very easy & rich , having tons of library for various purpose. Today i am going to show you how to use matlplotlib library for ploting graphs as well as saving the graph in particular directory.

import numpy as np
import matplotlib.pyplot as plt
import os

N = 5
menMeans = (20, 35, 30, 35, 27)
menStd = (2, 3, 4, 1, 2)

ind = np.arange(N)  # the x locations for the groups
width = 0.35       # the width of the bars

fig, ax = plt.subplots()
rects1 = ax.bar(ind, menMeans, width, color=’r’, yerr=menStd)

womenMeans = (25, 32, 34, 20, 25)
womenStd = (3, 5, 2, 3, 3)
rects2 = ax.bar(ind + width, womenMeans, width, color=’y’, yerr=womenStd)

# add some text for labels, title and axes ticks
ax.set_ylabel(‘Scores’)
ax.set_title(‘Scores by group and gender’)
ax.set_xticks(ind + width)
ax.set_xticklabels((‘G1’, ‘G2’, ‘G3’, ‘G4’, ‘G5’))

ax.legend((rects1[0], rects2[0]), (‘Men’, ‘Women’))

def autolabel(rects):
# attach some text labels
for rect in rects:
height = rect.get_height()
ax.text(rect.get_x() + rect.get_width()/2., 1.05*height,
‘%d’ % int(height),
ha=’center’, va=’bottom’)

autolabel(rects1)
autolabel(rects2)

#plt.show()
def save(path, ext=’png’, close=True, verbose=True):
# Extract the directory and filename from the given path
directory = os.path.split(path)[0]
filename = “%s.%s” % (os.path.split(path)[1], ext)
if directory == ”:
directory = ‘.’
# If the directory does not exist, create it
if not os.path.exists(directory):
os.makedirs(directory)
# The final path to save to
savepath = os.path.join(directory, filename)
if verbose:
print(“Saving figure to ‘%s’…” % savepath),
# Actually save the figure
plt.savefig(savepath)
# Close it
if close:
plt.close()
if verbose:
print(“Done”)

save(“/path/to/file”, ext=”png”, close=False, verbose=True)

Python – java integration (Jython continue…)

Friends , Having knowledge of multiple language is good. but sometimes it become cumbersome to use the libraries written in one language into another. Jython provides a way to run python over JVM. Hence allows integration of both java & python. We can use java classes & function in python as well as python libraries in java simply.

Below i am going to give an example for such.

  • We are going to create interface in java which will be implemented in python. which are again getting called in java language
  1. Create a package name org.jython.book.interfaces & define the interace as given below.
     

    package org.jython.book.interfaces;// Java interface for a building object

    public interface BuildingType {

    public String getBuildingName();

    public String getBuildingAddress();

    public String getBuildingId();

    }

  2. Create a python module which implements the above interface.

from org.jython.book.interfaces import BuildingType

class Building(BuildingType):
def __init__(self, name, address, id):
self.name = name
self.address = address
self.id = id

def getBuildingName(self):
return self.name

def getBuildingAddress(self):
return self.address

def getBuildingId(self):
return self.id

3. Create another package called package org.jython.book.util

Create a  class named BuildingFactory.java

 

/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.jython.book.util;
import org.jython.book.interfaces.BuildingType;
import org.python.core.PyObject;
import org.python.core.PyString;
import org.python.util.PythonInterpreter;

public class BuildingFactory {

private PyObject buildingClass;

/**
* Create a new PythonInterpreter object, then use it to execute some python
* code. In this case, we want to import the python module that we will
* coerce.
*
* Once the module is imported than we obtain a reference to it and assign
* the reference to a Java variable
*/
public BuildingFactory() {
PythonInterpreter interpreter = new PythonInterpreter();
interpreter.exec(“import sys\n” + “sys.path.append(‘/root/NetBeansProjects/JythonR/src/org/jython/book/interfaces/’)\n”+”from Building import Building”);
buildingClass = interpreter.get(“Building”);
}

/**
* The create method is responsible for performing the actual coercion of
* the referenced python module into Java bytecode
* @param name
* @param location
* @param id
* @return BuildingType
*/

public BuildingType create(String name, String location, String id) {
PyObject buildingObject = buildingClass.__call__(new PyString(name),
new PyString(location),
new PyString(id));

//buildingObject.__tojava__(Object.class);
BuildingType type = (BuildingType) buildingObject.__tojava__(BuildingType.class);
System.out.println(type.getClass());
return type;
}
}

4. Now simply write a main method

/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.jython.book;

import org.jython.book.util.BuildingFactory;
import org.jython.book.interfaces.BuildingType;

public class Main {

private static void print(BuildingType building) {
System.out.println(“Building Info: ” +
building.getBuildingId() + ” ” +
building.getBuildingName() + ” ” +
building.getBuildingAddress());
}

/**
* Create three building objects by calling the create() method of
* the factory.
*/

public static void main(String[] args) {
BuildingFactory factory = new BuildingFactory();
print(factory.create(“BUILDING-A”, “100 WEST MAIN”, “1”));
print(factory.create(“BUILDING-B”, “110 WEST MAIN”, “2”));
print(factory.create(“BUILDING-C”, “120 WEST MAIN”, “3”));

}

}

 

BuildingFactory class will create a factory object which will convert python object into java object.

 

Run the program. Remember you will need to install jython on your system & need to add jython jar file into class library in order to run it!!.

 

 

Front Controller design pattern

The front controller pattern makes sure that there is one and only one point of entry. All requests are investigated, routed to the designated controller and then processed accordingly to the specification. The front controller is responsible of initializing the environment and routing requests to designated controllers.

The front controller design pattern is used to provide a centralized request handling mechanism so that all requests will be handled by a single handler. This handler can do the authentication/ authorization/ logging or tracking of request and then pass the requests to corresponding handlers. Following are the entities of this type of design pattern.

  • Front Controller – Single handler for all kinds of requests coming to the application (either web based/ desktop based).
  • Dispatcher – Front Controller may use a dispatcher object which can dispatch the request to corresponding specific handler.
  • View – Views are the object for which the requests are made.

Implementation

We are going to create a FrontController and Dispatcher to act as Front Controller and Dispatcher correspondingly. HomeView and StudentView represent various views for which requests can come to front controller.

FrontControllerPatternDemo, our demo class, will use FrontController to demonstrate Front Controller Design Pattern.

Front Controller Pattern UML Diagram

Step 1

Create Views.

HomeView.java

public class HomeView {
   public void show(){
      System.out.println("Displaying Home Page");
   }
}

StudentView.java

public class StudentView {
   public void show(){
      System.out.println("Displaying Student Page");
   }
}

Step 2

Create Dispatcher.

Dispatcher.java

public class Dispatcher {
   private StudentView studentView;
   private HomeView homeView;
   
   public Dispatcher(){
      studentView = new StudentView();
      homeView = new HomeView();
   }

   public void dispatch(String request){
      if(request.equalsIgnoreCase("STUDENT")){
         studentView.show();
      }
      else{
         homeView.show();
      }	
   }
}

Step 3

Create FrontController

FrontController.java

public class FrontController {
	
   private Dispatcher dispatcher;

   public FrontController(){
      dispatcher = new Dispatcher();
   }

   private boolean isAuthenticUser(){
      System.out.println("User is authenticated successfully.");
      return true;
   }

   private void trackRequest(String request){
      System.out.println("Page requested: " + request);
   }

   public void dispatchRequest(String request){
      //log each request
      trackRequest(request);
      
      //authenticate the user
      if(isAuthenticUser()){
         dispatcher.dispatch(request);
      }	
   }
}

Step 4

Use the FrontController to demonstrate Front Controller Design Pattern.

FrontControllerPatternDemo.java

public class FrontControllerPatternDemo {
   public static void main(String[] args) {
   
      FrontController frontController = new FrontController();
      frontController.dispatchRequest("HOME");
      frontController.dispatchRequest("STUDENT");
   }
}

Step 5

Verify the output.

Page requested: HOME
User is authenticated successfully.
Displaying Home Page
Page requested: STUDENT
User is authenticated successfully.
Displaying Student Page

Event – Observer pattern

Imagine a situation: you are developing a custom e-commerce framework and are about to put some finishing touches to your order model. The task is to program functionality, which creates an order at the end of the checkout and saves it to the database. You realize that a confirmation email has to be sent and you add the necessary code. Next morning a product manager asks you to send a copy of the e-mail to his address, and you extend your code accordingly. At a meeting in the afternoon the warehouse guy proposes to automate inventory management by sending a message to the warehouse software; and you implement it in your code. Exporting order as an XML file to feed into the shipping system – done. Notifying the purchasing department when the inventory is too low – done too.

After all this you take a look at your order model and its function placeOrder and see it has become an unmaintainable mess. All these diverse and complex tasks just do not fit into the order model. Yet they are essential for the your business and must be implemented. Situations like this are not uncommon. The growing complexity of enterprise applications often results in code that is inflexible and difficult to maintain, and prohibitively expensive to scale.

Event-driven software architecture has evolved to address such problems by decoupling services and service providers. It introduces events – notable milestones in business processes that invoke services, which observe and react to them. Events alert their subscribers about a problem, or an opportunity, or a threshold in the current process flow. An event broadcasted in the system usually consists of an event header and body. The event header contains an ID that is used to locate subscribers, while the body transports information required to process the event. In some systems event headers can also include information on the event type and creator, or a timestamp – whatever data the specifications mandates.

The service providers are independent entities and can be added or removed without affecting objects, whose events they listen to. Event creators have no knowledge of the subscribed service providers and do not depend on them. Similarly service providers are not interested in the internal mechanics of event creators. This allows for extremely flexible, loosely coupled and distributed systems. This advantages, however, come at a price – tracing events and their subscribers can be difficult.

Below is simple example of Event-Observer pattern :

Application : Suppose we want to find out the different versions of given string let say hex ,decimal , octal etc we can apply event observer for this as given below:

Observer.java

abstract class Observer {

protected Subject subject;

public abstract void update();
}

 

Subject.java

public class Subject {

private List<Observer> observers = new ArrayList<Observer>();
private int state;

public int getState() {
return state;
}

public void setState(int state) {
this.state = state;
notifyAllObservers();
}

public void attach(Observer observer) {
observers.add(observer);
}

public void notifyAllObservers() {
for (Observer observer : observers) {
observer.update();
}
}
}

BinaryObserver.java
public class BinaryObserver extends Observer{

public BinaryObserver(Subject subject){
this.subject = subject;
this.subject.attach(this);
}

@Override
public void update() {
System.out.println( “Binary String: ” + Integer.toBinaryString( subject.getState() ) );
}
}

OctalObserver.java

public class OctalObserver extends Observer{

public OctalObserver(Subject subject){
this.subject = subject;
this.subject.attach(this);
}

@Override
public void update() {
System.out.println( “Octal String: ” + Integer.toOctalString( subject.getState() ) );
}
}

 

HexaObserver.java

public class HexaObserver extends Observer {

public HexaObserver(Subject subject) {
this.subject = subject;
this.subject.attach(this);
}

@Override
public void update() {
System.out.println(“Hex String: ” + Integer.toHexString(subject.getState()).toUpperCase());
}
}

 

ObserverPatternDemo.java

public class ObserverPatternDemo {

public static void main(String[] args) {
Subject subject = new Subject();
new HexaObserver(subject);
new OctalObserver(subject);
new BinaryObserver(subject);
System.out.println(“First state change: 15”);
subject.setState(15);
System.out.println(“Second state change: 10”);
subject.setState(10);
}
}

As soon as event is triggered it is propagated to all attached observer. A beautiful design pattern to code!!