Elasticsearch – Aggregation

Elasticsearch Aggregation provides capability similar to RDBMS group by opeartor.
Facets provide a great way to aggregate data within a document set context. This context is defined by the executed query in combination with the different levels of filters that can be defined (filtered queries, top-level filters, and facet level filters). While powerful, their implementation is not designed from the ground up to support complex aggregations and is thus limited.
An aggregation can be seen as a unit-of-work that builds analytic information over a set of documents.
There are many different types of aggregation, each with it’s own purpose & output. To Better understand these type, It is often best to break down into 2 families.
1. Bucketing
– A family of aggregations that build buckets , where each bucket is associated with key and a document criterion
– When the aggregation is executed, all the buckets criteria are evaluated on every document in the context and when a criterion matches, the document is considered to “fall in” the relevant bucket
– By the end of the aggregation process, we’ll end up with a list of buckets – each one with a set of documents that “belong” to it.
2. Metric
– Aggregations that keep track and compute metrics over a set of documents.
Different kinds of aggregation is listed below:
1.Min Aggregation
2.Max Aggregation
3.Sum Aggregation
4.Avg Aggregation
5.Stats Aggregation
6.Extended Stats Aggregation
7.Value Count Aggregation
8.Percentiles Aggregation
9.Percentile Ranks Aggregation
10.Cardinality Aggregation
11.Geo Bounds Aggregation
12.Top hits Aggregation
13.Scripted Metric Aggregation
14.Global Aggregation
15.Filter Aggregation
16.Filters Aggregation
17.Missing Aggregation
18.Nested Aggregation
19.Reverse nested Aggregation
20.Children Aggregation
21.Terms Aggregation
22.Significant Terms Aggregation
23.Range Aggregation
24.Date Range Aggregation
25.IPv4 Range Aggregation
26.Histogram Aggregation
27.Date Histogram Aggregation
28.Geo Distance Aggregation
29.GeoHash grid Aggregation


Custom Partitioner / Combiner In hadoop

Hadoop Provides support for defining Custom Partitioner & Combiner for writing mapreduce jobs. The use case of Combiner & Partitioner is different. Below are some useful information regarding partitioner & Combiner.

– The partitioning phase takes place after the map phase and before the reduce phase.
– The number of partitions is equal to the number of reducers. The data gets partitioned across the reducers according to the partitioning function
– The difference between a partitioner and a combiner is that the partitioner divides the data according to the number of reducers so that all the data in a single partition gets executed by a single reducer.
– However, the combiner functions similar to the reducer and processes the data in each partition. The combiner is an optimization to the reducer.
– The default partitioning function is the hash partitioning function where the hashing is done on the key.However it might be useful to partition the data according to some other function of the key or the value.

Suppose that we want to partition data based on each gender & Age group which is <20 , 20-50 , 50+. We can write the following MapReduce jobs.
Output:Partition – 0: (this partition contains the maximum scorers for each gender whose age is less than 20)
Nancyage- 7femalescore-98
Adamage- 9malescore-37Partition – 1: (this partition contains the maximum scorers for each gender whose age is between 20 and 50)Kristineage- 38femalescore-53
Bobage- 34malescore-89Partition – 2: (this partition contains the maximum scorers for each gender whose age is greater than 50)Monicaage- 56femalescore-92
Chrisage- 67malescore-97Mapper class:package HadoopGTK;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
* @author Bhavesh Gadoya
public class SampleMapper extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split(“\t”);
String gender = tokens[2];
String nameAgeScore = tokens[0] + “\t” + tokens[1] + “\t” + tokens[3];
context.write(new Text(gender), new Text(nameAgeScore));
public void run(Context context) throws IOException, InterruptedException {
while (context.nextKeyValue()) {
System.out.println(context.getCurrentKey()+” , “+context.getCurrentValue());
map(context.getCurrentKey(), context.getCurrentValue(), context);
}Reducer Class

package HadoopGTK;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/* *
* @author:Bhavesh Gadoya
public class SampleReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable values<Text>, Context context) throws IOException, InterruptedException {
int maxScore = Integer.MIN_VALUE;
String name = " ";
String age = " ";
String gender = " ";
int score = 0;
for (Text val : values) {
String[] valTokens = val.toString().split("\\t");
score = Integer.parseInt(valTokens[2]);
if (score > maxScore) {
name = valTokens[0];
age = valTokens[1];
gender = key.toString();
maxScore = score;
context.write(new Text(name), new Text("age- " + age + "\t" + gender + "\tscore-" + maxScore));

Partitioner class:

package HadoopGTK;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
* @author Bhavesh Gadoya
public class AgePartitioner extends Partitioner<Text, Text>

public int getPartition(Text key, Text value, int numReduceTasks) {
String[] nameAgeScore = value.toString().split(“\t”);
String age = nameAgeScore[1];
int ageInt = Integer.parseInt(age);
if(ageInt <=20 && ageInt > 0) {
return 0;
if (ageInt > 20 && ageInt <= 50) {
return 1;
} else {
return 2;

Driver class:

package HadoopGTK;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
* @author Bhavesh Gadoya
* Description : This program illustrates how to use a custom partitioner in a MapReduce program.
* input format: Name age gender score
* Hadoop version used : 0.20.2


public class SamplePartitioner extends Configured implements Tool {
// the driver to execute two jobs and invoke the map/reduce functions

// org.slf4j.Logger logger;
public int run(String[] args) throws Exception {
assignPartition(args[0], args[1]);
return 0;

// method to get job configuration for the customized partitioning MapReduce program
//executing the job to run the mapper, custom partitioner and the reducer
private void assignPartition(String inputPath, String outputPath) throws Exception {
Job job = new Job();
Configuration conf = new Configuration();
String input = inputPath;
String output = outputPath;
FileSystem fs = FileSystem.getLocal(job.getConfiguration());
Path opPath = new Path(output);
fs.delete(opPath, true);
FileInputFormat.setInputPaths(job, new Path(input)); // setting
FileOutputFormat.setOutputPath(job, new Path(output)); // setting

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new SamplePartitioner(), args);
if (args.length != 2) {
System.err.println(“Usage: SamplePartitioner “);

Using nginx to provide authentication to Elasticsearch / Kibana

Friends authentication & authorization is always an important requirement for development of any application.

In this post i am going to show you how to provide authentication to elasticsearch / kibana using Nginx server.

Steps are given below:

1. Install the nginx server

You can follow below given link for reference.


2. Create the configuration file as kibana.conf or elasticsearch.con under /etc/nginx/conf.d (under configuration directory)

3. Add the following code to kibana.conf

server {
listen 80;
server_name yourdomain.com; ## Replace with your domain name
location / {
auth_basic "Restricted";
auth_basic_user_file /etc/nginx/conf.d/kibana.htpasswd;
proxy_pass http://yourdomain.com:5601; ## Replace with your kibana instance as kibana runs on 5601 for ES use port number 9200

4. Create kibana.htpasswd file under /etc/nginx/conf.d directory

5. Run the following command to generate the username / password for authentication

sudo htpasswd -c /etc/nginx/conf.d/kibana.htpasswd bhavesh

It will ask for password to set for username bhavesh. Enter it. It will store the generated username password in respected file colon seperated in encrypted form.

6. Restart the nginx

7. Point your browser to yourdomain.com & verify.

The above steps will provide authorization. One can provide authorization based on indices using kibana shield plugin. Otherwise you can follow below link describing tricks for MultiRole Authorization