Wednesday 23 April 2014

Hadoop WordCount Example In Detail


For any programming language there is a "Hello World" program. Like wise for Hadoop also there is a "Hello World" program - WordCount Example.


/*
 * import Statements
 */
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * @author Unmesha SreeVeni U.B
 *
 */
public class WordCount {
 /*
  * Map class extends Mapper Base Class 
  * Four arguments 
  * key/Value input and
  * key Value Output Types 
  * Key Input: LongWritable (Line offset of input file) 
  * Value Input: Text (Each line in a file)
  * 
  * Key Output : Text (Each word in a file) 
  * Value Output : IntWritable (1)
  * 
  * Input Line: qwerty the rose the 
  * Input Key/Value : 234 qwerty the rose the
  * Output key/Value : qwerty 1 the 1 rose 1 the 1
  */
 public static class Map extends
   Mapper<LongWritable, Text, Text, IntWritable> {
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
   /*
    * Getting each value(each line of a file) in line variable. Using
    * stringTokenizer splits each word in a line and emit each word as
    * key and 1 as value
    */
   String line = value.toString();
   // line = "qwerty the rose the"
   StringTokenizer tokenizer = new StringTokenizer(line);
   while (tokenizer.hasMoreTokens()) {
    word.set(tokenizer.nextToken());
    context.write(word, one);
    /*
     * qwerty 1 
     * the 1 
     * rose 1 
     * the 1
     */
   }
  }
 }

 /*
  * In between Shuffle and sort takes place. After each map() there will be a
  * shuffle and sort phase. Shuffle aggregates all the unique keys and
  * convert those values into a single list 
  * eg: if one map() emits 
  * qwerty 1
  * the 1 
  * rose 1 
  * the 1 
  * 
  * Then after shuffle output will be 
  * qwerty,[1] 
  * the,[1,1]
  * rose,[1]
  * 
  * and sorting is done after the completion of each Map() So the input to
  * Reducer will be unique key with list of values 
  * qwerty,[1] 
  * rose,[1]
  * the,[1,1]
  */
 public static class Reduce extends
   Reducer<Text, IntWritable, Text, IntWritable> {
  /*
   * Reducer need to extend the Reducer Base class 
   * Four arguments
   * key/Value input and key Value Output Types 
   * Key Input: Text (unique key from mapper)
   * Value Input: IntWritable (List of values)
   *  
   * Key Output: Text (each unique word) 
   * Value Input: IntWritable (count of each word)
   * 
   * Input key/Value : 
   * qwerty,[1] 
   * rose,[1] 
   * the,[1,1] 
   * 
   * Output Key/value :
   * qwerty,1 
   * rose,1 
   * the,2
   */
  public void reduce(Text key, Iterable<IntWritable> values,
    Context context) throws IOException, InterruptedException {
   /*
    * Text key :unique word and Iterable<IntWritable> values will be
    * list of values the,[1,1] key the Iterable Value [1,1]
    */
   int sum = 0;
   for (IntWritable val : values) {
    sum += val.get();
   }
   context.write(key, new IntWritable(sum));
   /*
    * qwerty,1 
    * rose,1 
    * the,2
    */
  }
 }

 /*
  * main or driver class which contains all the configuration to set up a
  * mapreduce job
  */
 public static void main(String[] args) throws Exception {

  /*
   * creating a configuration object
   */
  Configuration conf = new Configuration();
  Job job = new Job(conf, "wordcount");
  job.setJarByClass(WordCount.class);

  /*
   * what are the values of key/value output type from mapper
   */
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);

  /*
   * what are the values of key/value output type from Reducer
   */
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  /*
   * specify Mapper class and Reducer class
   */
  job.setMapperClass(Map.class);
  job.setReducerClass(Reduce.class);

  /*
   * Setting input format default is TextInputFormat each line terminated
   * with '\n'
   */
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);

  /*
   * Setting Input Directory and output Directory Output directory should
   * be a non existing one
   */
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  /*
   * waits for the completion of the job
   */
  job.waitForCompletion(true);
 }

}

Happy Hadooping . . .

4 comments:

  1. can you provide MapReduce Program to count the total volume done for each stock in csv file

    ReplyDelete
  2. Incredible! This blog looks just like my old one! It's on a completely different topic but it has pretty much the same page layout and design. Great choice of colors! machining edmonton

    ReplyDelete
  3. Awesome post. You Post is very informative. Thanks for Sharing.
    Hadoop Training in Noida

    ReplyDelete