Wednesday, 23 April 2014

Hadoop WordCount Example In Detail


For any programming language there is a "Hello World" program. Like wise for Hadoop also there is a "Hello World" program - WordCount Example.


/*
 * import Statements
 */
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * @author Unmesha SreeVeni U.B
 *
 */
public class WordCount {
 /*
  * Map class extends Mapper Base Class 
  * Four arguments 
  * key/Value input and
  * key Value Output Types 
  * Key Input: LongWritable (Line offset of input file) 
  * Value Input: Text (Each line in a file)
  * 
  * Key Output : Text (Each word in a file) 
  * Value Output : IntWritable (1)
  * 
  * Input Line: qwerty the rose the 
  * Input Key/Value : 234 qwerty the rose the
  * Output key/Value : qwerty 1 the 1 rose 1 the 1
  */
 public static class Map extends
   Mapper<LongWritable, Text, Text, IntWritable> {
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
   /*
    * Getting each value(each line of a file) in line variable. Using
    * stringTokenizer splits each word in a line and emit each word as
    * key and 1 as value
    */
   String line = value.toString();
   // line = "qwerty the rose the"
   StringTokenizer tokenizer = new StringTokenizer(line);
   while (tokenizer.hasMoreTokens()) {
    word.set(tokenizer.nextToken());
    context.write(word, one);
    /*
     * qwerty 1 
     * the 1 
     * rose 1 
     * the 1
     */
   }
  }
 }

 /*
  * In between Shuffle and sort takes place. After each map() there will be a
  * shuffle and sort phase. Shuffle aggregates all the unique keys and
  * convert those values into a single list 
  * eg: if one map() emits 
  * qwerty 1
  * the 1 
  * rose 1 
  * the 1 
  * 
  * Then after shuffle output will be 
  * qwerty,[1] 
  * the,[1,1]
  * rose,[1]
  * 
  * and sorting is done after the completion of each Map() So the input to
  * Reducer will be unique key with list of values 
  * qwerty,[1] 
  * rose,[1]
  * the,[1,1]
  */
 public static class Reduce extends
   Reducer<Text, IntWritable, Text, IntWritable> {
  /*
   * Reducer need to extend the Reducer Base class 
   * Four arguments
   * key/Value input and key Value Output Types 
   * Key Input: Text (unique key from mapper)
   * Value Input: IntWritable (List of values)
   *  
   * Key Output: Text (each unique word) 
   * Value Input: IntWritable (count of each word)
   * 
   * Input key/Value : 
   * qwerty,[1] 
   * rose,[1] 
   * the,[1,1] 
   * 
   * Output Key/value :
   * qwerty,1 
   * rose,1 
   * the,2
   */
  public void reduce(Text key, Iterable<IntWritable> values,
    Context context) throws IOException, InterruptedException {
   /*
    * Text key :unique word and Iterable<IntWritable> values will be
    * list of values the,[1,1] key the Iterable Value [1,1]
    */
   int sum = 0;
   for (IntWritable val : values) {
    sum += val.get();
   }
   context.write(key, new IntWritable(sum));
   /*
    * qwerty,1 
    * rose,1 
    * the,2
    */
  }
 }

 /*
  * main or driver class which contains all the configuration to set up a
  * mapreduce job
  */
 public static void main(String[] args) throws Exception {

  /*
   * creating a configuration object
   */
  Configuration conf = new Configuration();
  Job job = new Job(conf, "wordcount");
  job.setJarByClass(WordCount.class);

  /*
   * what are the values of key/value output type from mapper
   */
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);

  /*
   * what are the values of key/value output type from Reducer
   */
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  /*
   * specify Mapper class and Reducer class
   */
  job.setMapperClass(Map.class);
  job.setReducerClass(Reduce.class);

  /*
   * Setting input format default is TextInputFormat each line terminated
   * with '\n'
   */
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  /*
   * Setting Input Directory and output Directory Output directory should
   * be a non existing one
   */
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  /*
   * waits for the completion of the job
   */
  job.waitForCompletion(true);
 }

}

Happy Hadooping . . .

Tuesday, 22 April 2014

Chaining Jobs in Hadoop MapReduce


There can be more than one MapReduce Job workflow, the question arises: how do you manage the jobs so they are executed in order? There are several approaches, One such approach is shown below:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author Unmesha SreeVeni U.B
 * 
 */
public class ChainJobs extends Configured implements Tool {

 private static final String OUTPUT_PATH = "intermediate_output";

 @Override
 public int run(String[] args) throws Exception {
  /*
   * Job 1
   */
  Configuration conf = getConf();
  FileSystem fs = FileSystem.get(conf);
  Job job = new Job(conf, "Job1");
  job.setJarByClass(ChainJobs1.class);

  job.setMapperClass(MyMapper1.class);
  job.setReducerClass(MyReducer1.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job, new Path(args[0]));
  TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

  job.waitForCompletion(true);

  /*
   * Job 2
   */
  Configuration conf2 = getConf();
  Job job2 = new Job(conf2, "Job 2");
  job2.setJarByClass(ChainJobs2.class);

  job2.setMapperClass(MyMapper2.class);
  job2.setReducerClass(MyReducer2.class);

  job2.setOutputKeyClass(Text.class);
  job2.setOutputValueClass(Text.class);

  job2.setInputFormatClass(TextInputFormat.class);
  job2.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
  TextOutputFormat.setOutputPath(job2, new Path(args[1]));

  return job2.waitForCompletion(true) ? 0 : 1;
 }

 /**
  * Method Name: main Return type: none Purpose:Read the arguments from
  * command line and run the Job till completion
  * 
  */
 public static void main(String[] args) throws Exception {
  // TODO Auto-generated method stub
  if (args.length != 2) {
   System.err.println("Enter valid number of arguments <Inputdirectory>  <Outputlocation>");
   System.exit(0);
  }
  ToolRunner.run(new Configuration(), new ChainJobs(), args);
 }
}

Command to run MapReduce Job


hadoop jar chainjob.jar /hdfspath/to/input /hdfspath/to/output

First Job Configuration


  /*
   * Job 1
   */
  Configuration conf = getConf();
  FileSystem fs = FileSystem.get(conf);
  Job job = new Job(conf, "Job1");
  job.setJarByClass(ChainJobs1.class);

  job.setMapperClass(MyMapper1.class);
  job.setReducerClass(MyReducer1.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job, new Path(args[0]));
  TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

  job.waitForCompletion(true);


Second Job Configuration

  /*
   * Job 2
   */
  Configuration conf2 = getConf();
  Job job2 = new Job(conf2, "Job 2");
  job2.setJarByClass(ChainJobs2.class);

  job2.setMapperClass(MyMapper2.class);
  job2.setReducerClass(MyReducer2.class);

  job2.setOutputKeyClass(Text.class);
  job2.setOutputValueClass(Text.class);

  job2.setInputFormatClass(TextInputFormat.class);
  job2.setOutputFormatClass(TextOutputFormat.class);

  TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
  TextOutputFormat.setOutputPath(job2, new Path(args[1]));

  return job2.waitForCompletion(true) ? 0 : 1;

OUTPUT_PATH is the intermediate output folder for first job, which acts as an input for second job. And the final output is written to args[1].


args[0] : Input path
OUTPUT_PATH : Intermediate output from job1
args[1] : Output path

Happy Hadooping . . .

Calculating min and max of columns in a csv file - Hadoop MapReduce


The below code shows how to find min and max values of each column in a csv file.
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * @author Unmesha SreeVeni U.B
 */
public class ColumnAggregator {

 public static class ColMapper extends
   Mapper<Object, Text, Text, DoubleWritable> {
  /*
   * Emits column Id as key and entire column elements as Values
   */
  public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
   String[] cols = value.toString().split(",");
   for (int i = 0; i < cols.length; i++) { 
    context.write(new Text(String.valueOf(i + 1)),new DoubleWritable(Double.parseDouble(cols[i])));
   }

  }
 }

 public static class ColReducer extends
   Reducer<Text, DoubleWritable, Text, DoubleWritable> {
  /*
   * Reducer finds min and max of each column
   */

  public void reduce(Text key, Iterable<DoubleWritable> values,
    Context context) throws IOException, InterruptedException {
   double min = Integer.MAX_VALUE, max = 0;
   Iterator<DoubleWritable> iterator = values.iterator(); //Iterating 
   while (iterator.hasNext()) {
    double value = iterator.next().get();
    if (value < min) { //Finding min value
     min = value;
    }
    if (value > max) { //Finding max value
     max = value;
    }
   }
   context.write(new Text(key), new DoubleWritable(min));
   context.write(new Text(key), new DoubleWritable(max));
  }
 }
 public static void main(String[] args) throws Exception {

  Configuration conf = new Configuration();

  Job job = new Job(conf, "Min and Max");
  job.setJarByClass(ColumnAggregator.class);
  FileSystem fs = FileSystem.get(conf);
  if (fs.exists(new Path(args[1]))) {
   fs.delete(new Path(args[1]), true);
  }
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(DoubleWritable.class);

  job.setMapperClass(ColMapper.class);
  job.setReducerClass(ColReducer.class);

  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

Explanation
Mapper:
Map receives Offset of the file and each line as key value pair.Map generates an id for each column and emit the id and entire column to Reducer.
Reducer:
Reducer recieves each column Id and List of values as key value pair and finds min and max for each key and emit column id as key and min and max as values


Here ,If only 1 reducer is used ,then we will be stressing the Reducer for finding min and max.
There is a better idea that can be done in Map()

We have setup() and cleanup() functions.
setup() executes before all map() and cleanup() executes after all map().

It is better to add min and max finding code in cleanup()
Map()
{
/*No emit*/  
}
cleanup()
{
 emit(colId(,min,max))
}


Again in reducer we need to find Min and Max
reducer()
{
 emit(colId,(min,max))
}


Now the Reducer need to calculate only some combinations of min amd max.This way we can reduce the stress given to reducer.

Happy Hadooping.

Monday, 21 April 2014

Code For Deleting Output Folder If Exist In Hadoop MapReduce Jobs




Mostly Hadoop MapReduce Jobs operates with two arguments.
Input directory and Output directory.

Each time when we run our MapReduce job we need to give  non-existing folder as our output path. So while we are doing a Trail and Error method in our MR jobs. It is good if it automatically deletes if the output folder exists.

Here is the code for that:
/*Provides access to configuration parameters*/
Configuration conf = new Configuration();
/*Creating Filesystem object with the configuration*/
FileSystem fs = FileSystem.get(conf);
/*Check if output path (args[1])exist or not*/
if(fs.exists(new Path(args[1]))){
   /*If exist delete the output path*/
   fs.delete(new Path(args[1]),true);
}

Wrong import statements in Hadoop MapReduce Job




It is quite natural seeing ClassCastException in Hadoop MR Jobs like these for Hadoop beginers.

java.lang.Exception: java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text
 at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text
 at java.lang.Class.asSubclass(Class.java:3037)
 at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:819)
 at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:836)
 at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:376)
 at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:85)
 at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:584)
 at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:656)
 at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
 at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
 at java.util.concurrent.FutureTask.run(FutureTask.java:166)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:722)

While checking MR code we will not find any errors,but causes a ClassCastException

When Eclipse detects a problem in your code, it will display an error or warning icon along the left edge - known as gutter. Hover over the icon, a description of the problem pops up.  And imports the wrong statement and runs the job.





It is because we are importing the wrong import statement by mistake.





 Correct import statement is 


Now MR job executes with no exception

Happy Hadooping 





How to get IP address using a specific weblog

Example input :

  96.7.4.14 - - [24/Apr/2011:04:20:11 -0400] "GET /cat.jpg HTTP/1.1" 200 12433

String[] fields = value.toString().split(" ");
if (fields.length > 0) {
 String ip = fields[0];
}

 String variable "ip" gives IP Address

Tuesday, 8 April 2014

Name node is in safe mode - How to leave





Namenode enters in safe mode in unusual situations, for example when disk is full, also in the start-up phase. So to leave safemode,run below command.

bin/hadoop dfsadmin -safemode leave


After doing the above command, Run hadoop fsck so that any inconsistencies crept in the hdfs might be sorted out.

Use  hdfs  command instead of  hadoop command for newer distributions:
hdfs dfsadmin -safemode leave

hadoop dfsadmin has been deprecated, all hdfs related tasks are being moved to a separate command hdfs