Custom Partitioner / Combiner In hadoop

Hadoop Provides support for defining Custom Partitioner & Combiner for writing mapreduce jobs. The use case of Combiner & Partitioner is different. Below are some useful information regarding partitioner & Combiner.

– The partitioning phase takes place after the map phase and before the reduce phase.
– The number of partitions is equal to the number of reducers. The data gets partitioned across the reducers according to the partitioning function
– The difference between a partitioner and a combiner is that the partitioner divides the data according to the number of reducers so that all the data in a single partition gets executed by a single reducer.
– However, the combiner functions similar to the reducer and processes the data in each partition. The combiner is an optimization to the reducer.
– The default partitioning function is the hash partitioning function where the hashing is done on the key.However it might be useful to partition the data according to some other function of the key or the value.

Example:
Suppose that we want to partition data based on each gender & Age group which is <20 , 20-50 , 50+. We can write the following MapReduce jobs.
InputAlice23female45
Bob34male89
Chris67male97
Kristine38female53
Connor25male27
Daniel78male95
James34male79
Alex52male69
Nancy7female98
Adam9male37
Jacob7male23
Mary6female93
Clara87female72
Monica56female92
Output:Partition – 0: (this partition contains the maximum scorers for each gender whose age is less than 20)
Nancyage- 7femalescore-98
Adamage- 9malescore-37Partition – 1: (this partition contains the maximum scorers for each gender whose age is between 20 and 50)Kristineage- 38femalescore-53
Bobage- 34malescore-89Partition – 2: (this partition contains the maximum scorers for each gender whose age is greater than 50)Monicaage- 56femalescore-92
Chrisage- 67malescore-97Mapper class:package HadoopGTK;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
*
* @author Bhavesh Gadoya
*/
public class SampleMapper extends Mapper<Object, Text, Text, Text>{
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split(“\t”);
System.out.println(tokens[2]);
String gender = tokens[2];
String nameAgeScore = tokens[0] + “\t” + tokens[1] + “\t” + tokens[3];
context.write(new Text(gender), new Text(nameAgeScore));
}
@Override
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
System.out.println(context.getCurrentKey()+” , “+context.getCurrentValue());
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
}Reducer Class

package HadoopGTK;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/* *
* @author:Bhavesh Gadoya
*/
public class SampleReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable values<Text>, Context context) throws IOException, InterruptedException {
int maxScore = Integer.MIN_VALUE;
String name = " ";
String age = " ";
String gender = " ";
int score = 0;
for (Text val : values) {
String[] valTokens = val.toString().split("\\t");
score = Integer.parseInt(valTokens[2]);
if (score > maxScore) {
name = valTokens[0];
age = valTokens[1];
gender = key.toString();
maxScore = score;
}
}
context.write(new Text(name), new Text("age- " + age + "\t" + gender + "\tscore-" + maxScore));
}
}

Partitioner class:

package HadoopGTK;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
*
* @author Bhavesh Gadoya
*/
public class AgePartitioner extends Partitioner<Text, Text>

{
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
String[] nameAgeScore = value.toString().split(“\t”);
String age = nameAgeScore[1];
int ageInt = Integer.parseInt(age);
if(ageInt <=20 && ageInt > 0) {
return 0;
}
if (ageInt > 20 && ageInt <= 50) {
return 1;
} else {
return 2;
}
}
}


Driver class:

package HadoopGTK;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @author Bhavesh Gadoya
* Description : This program illustrates how to use a custom partitioner in a MapReduce program.
*
* input format: Name age gender score
*
* Hadoop version used : 0.20.2

*/

public class SamplePartitioner extends Configured implements Tool {
// the driver to execute two jobs and invoke the map/reduce functions

// org.slf4j.Logger logger;
@Override
public int run(String[] args) throws Exception {
assignPartition(args[0], args[1]);
return 0;
}

// method to get job configuration for the customized partitioning MapReduce program
//executing the job to run the mapper, custom partitioner and the reducer
private void assignPartition(String inputPath, String outputPath) throws Exception {
Job job = new Job();
Configuration conf = new Configuration();
conf.addResource(“/etc/hadoop/conf/core-site.xml”);
conf.addResource(“/etc/hadoop/conf/hdfs-site.xml”);
job.setMapperClass(SampleMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(AgePartitioner.class);
job.setNumReduceTasks(3);
job.setReducerClass(SampleReducer.class);
job.setJarByClass(SamplePartitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setJobName(“WordCounter”);
String input = inputPath;
String output = outputPath;
FileSystem fs = FileSystem.getLocal(job.getConfiguration());
Path opPath = new Path(output);
fs.delete(opPath, true);
FileInputFormat.setInputPaths(job, new Path(input)); // setting
FileOutputFormat.setOutputPath(job, new Path(output)); // setting
job.waitForCompletion(true);
}

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new SamplePartitioner(), args);
if (args.length != 2) {
System.err.println(“Usage: SamplePartitioner “);
System.exit(2);
}
System.exit(res);
}
}

Advertisements