Thursday, 10 December 2015

Faster way to count number of lines in a file/dir using Map Reduce Framework


In this site you can see one way to count number of lines in a file.
They are emitting count as one for each record in each map. So if 1 map holds 10,000 lines 10,000 values will be passed to reducer.If more than one mapper that many read-writes will happen.
Lets reduce the intermediate writes.

Below is an optimized way to count no of lines in a file/dir
Changes are done in
1. Mapper
Instead of emitting 'one' for each record, we increment line count in map and emit them in cleanup() phase.
public class LineCntMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {

 Text keyEmit = new Text("Total Lines");
 IntWritable valEmit = new IntWritable();
 int partialSum = 0;

 public void map(LongWritable key, Text value, Context context) {
  partialSum++;
 }

 public void cleanup(Context context) {
  valEmit.set(partialSum);
  try {
   context.write(keyEmit, valEmit);
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   System.exit(0);
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
   System.exit(0);
  }
 }
}
So if we have 5 map tasks we will only emit 5 intermediate key-value pair.

2. Driver
In Driver we will include a combiner also
job.setMapperClass(LineCntMapper.class);
job.setCombinerClass(LineCntReducer.class);
job.setReducerClass(LineCntReducer.class);
Combiner doesnt do nothing more than Reducer. we can use reducer as combiner itself.
Reducer doesnt need any change.

If you run this code you will get the results faster than the previous mentioned code in this site .

Working code is here

Happy Hadooping........

DoubleArrayWritable in Hadoop


Lets see how to emit double arrays from mapper and process them in reducer

DoubleArrayWritable class
public static class DoubleArrayWritable extends ArrayWritable {
 public DoubleArrayWritable() {
  super(DoubleWritable.class);
 }
}

Driver()
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(DoubleArrayWritable.class);

job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(DoubleArrayWritable.class);

map()
import mywritable.DoubleArrayWritable;

public class MyMapper extends
Mapper<Object, Text, IntWritable, DoubleArrayWritable> {

 public void map(Object key, Text value, Context context)
 {
  //Do something............
  double[] arr = new double[size];

  DoubleArrayWritable arrWritable = new DoubleArrayWritable();
  DoubleWritable[] data = new DoubleWritable[size];
  for (int k = 0; k < size; k++) {
   data[k] = new DoubleWritable(arr[k]);
  }
  arrWritable.set(data);

   context.write(mykey, arrWritable);
 }
}

reduce()
import mywritable.DoubleArrayWritable;

public class MyReducer  extends
Reducer<IntWritable, DoubleArrayWritable, NullWritable, Text> {

 public void reduce(IntWritable key,
   Iterable<DoubleArrayWritable> values, Context context){

  double[] sum = new double[size];
  for (DoubleArrayWritable c : values) {
    Writable[] temp = new DoubleWritable[size];
    temp = (c.get());
    for (int i = 0; i < size; i++) {
         sum[i] += Double.parseDouble(temp[i].toString());
    }

   //Do something and emit values ..................

   context.write(out, new Text(emit));
  }
 }
}

Happy Hadooping.......

Wednesday, 9 December 2015

Partitioning Data Using Hadoop MultipleOutputs

There may be cases where we need to partition our data based on certion condition.
Say for example, Consider this Employee data
EmpId,EmpName,Age,Gender,Salary
1201,gopal,45,Male,50000
1202,manisha,40,Female,51000
1203,khaleel,34,Male,30000
1204,prasanth,30,Male,31000
1205,kiran,20,Male,40000
1206,laxmi,25,Female,35000
1207,bhavya,20,Female,15000
1208,reshma,19,Female,14000
1209,kranthi,22,Male,22000
1210,Satish,24,Male,25000
1211,Krishna,25,Male,26000
1212,Arshad,28,Male,20000
1213,lavanya,18,Female,8000
Lets assume one condition.
We need to seperate above data based on Gender (there can be more scenarios)
Expected outcome will be like this 
Female

1213,lavanya,18,Female,8000
1202,manisha,40,Female,51000
1206,laxmi,25,Female,35000
1207,bhavya,20,Female,15000
1208,reshma,19,Female,14000
Male

1211,Krishna,25,Male,26000
1212,Arshad,28,Male,20000
1201,gopal,45,Male,50000
1209,kranthi,22,Male,22000
1210,Satish,24,Male,25000
1203,khaleel,34,Male,30000
1204,prasanth,30,Male,31000
1205,kiran,20,Male,40000
This can be achieved by using MultipleOutputs in Hadoop.
The name itself gives an idea on what MultipleOutputs is - writes output data to multiple outputs

Lets see how to implement this
Driver Class
public class PartitionerDriver extends Configured implements Tool {

 /**
  * @param args
  * @throws Exception
  */
 public static void main(String[] args) {
  // TODO Auto-generated method stub
  Configuration conf = new Configuration();
  try {
   int res = ToolRunner.run(conf, new PartitionerDriver(), args);
  } catch (Exception e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }

 public int run(String[] args) {
  // TODO Auto-generated method stub
  System.out.println("Partitioning File Based on Gender...........");
  if (args.length != 3) {
   System.err
     .println("Usage:File Partitioner <input> <output> <delimiter> ");
   System.exit(0);
  }
  Configuration conf = new Configuration();
  /*
   * Arguments
   */
  String source = args[0];
  String dest = args[1];
  String delimiter = args[2];
  
  //conf objects
  conf.set("delimiter", delimiter);
  
  FileSystem fs = null;
  try {
   fs = FileSystem.get(conf);
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  
  Path in = new Path(source);
  Path out = new Path(dest);
  
  Job job0 = null;
  try {
   job0 = new Job(conf, "Partition Records");
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  job0.setJarByClass(PartitionerDriver.class);
  job0.setMapperClass(PartitionMapper.class);
  job0.setReducerClass(PartitionReducer.class);
  job0.setMapOutputKeyClass(Text.class);
  job0.setMapOutputValueClass(Text.class);
  job0.setOutputKeyClass(Text.class);
  job0.setOutputValueClass(Text.class);
  try {
   TextInputFormat.addInputPath(job0, in);
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  /*
   * Delete output dir if exist
   */
  try {
   if (fs.exists(out)) {
    fs.delete(out, true);
   }
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  
  TextOutputFormat.setOutputPath(job0, out);
  try {
   job0.waitForCompletion(true);
  } catch (ClassNotFoundException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  
  System.out.println("Successfully partitioned Data based on Gender!");
  return 0;
 }
}


Mapper Class
Mapper class gets each record and split them using delimiter.
Key will be Gender and value will be the record

protected void map(LongWritable key, Text value, Context context) {

  Configuration conf = context.getConfiguration();
  String delim = conf.get("delimiter");
  String line = value.toString();
  
  String[] record = line.split(delim);
  keyEmit.set(record[3]);
   try {
    context.write(keyEmit, value);
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }
 }

Reducer Class
MultipleOutputs<NullWritable, Text> mos;
 NullWritable out = NullWritable.get();

 @Override
 protected void setup(Context context) {
  mos = new MultipleOutputs(context);
 }

 public void reduce(Text key, Iterable<Text> values, Context context) {
  for (Text value : values) {
   try {
    mos.write(out, value, key.toString());
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
 }

 @Override
 protected void cleanup(Context context) {
  try {
   mos.close();
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  } catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }

Here we define MultipleOutput as
MultipleOutputs<NullWritable, Text> mos;
Our file doesnot need a key.We are only interested to get the data. So key will be NullWritable and Value will be Text.
We have a setup() method where we initialize out MultipleOutput.
mos = new MultipleOutputs(context);
Lets see reduce()
As we know reducer aggregates values based on key. Key from mapper was Gender and we have 2 genders Male and female . so we will recieve 2 keys follwed by its value.
for (Text value : values) {
 mos.write(out, value, key.toString());
}
In MultipleOutputs we have 3 arguments
1. key
2. value and 
3. File Name.
Here our key is NullWritable ,Value will be the record for each key. And we named our file with key.

Once you run this code. You can see an output as below


Two files 
1. Female-r-00000 
2. Male-r-00000 
Lets see the contents



You can find the code here . 

Happy Hadooping..............

Monday, 4 May 2015

Hadoop Word Count Using C Language - Hadoop Streaming


Prerequisites
1. Hadoop (Example based on cloudera distribution cdh5)
2. gcc compiler

Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer. 

We need 2 programs mapper.c and reducer.c. You can find the code in GitHub.

1. Compile mapper.c and reducer.c
hadoop@namenode2:~/hadoopstreaming$ gcc -o mapper.out  mapper.c
hadoop@namenode2:~/hadoopstreaming$ gcc -o reducer.out  reducer.c
hadoop@namenode2:~/hadoopstreaming$ ls
mapper.c  mapper.out  reducer.c  reducer.out
Here you can see 2 executables mapper.out and reducer.out.

2. Place your wordcount input file in HDFS
hadoop@namenode2:~$ hadoop fs -put /home/hadoop/wc /
hadoop@namenode2:~$ hadoop fs -ls /
drwxr-xr-x   - hadoop hadoop         0 2015-05-04 15:50 /wc

3. Now we will run our C program in HDFS with the help of  Hadoop Streaming jar.
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming-2.5.0-cdh5.3.1.jar 
-files hadoopstreaming/mapper.out -mapper hadoopstreaming/mapper.out 
-files hadoopstreaming/reducer.out -reducer hadoopstreaming/reducer.out 
-input /wc -output /wordcount-out

For Apache Hadoop
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*streaming*.jar 
-files hadoopstreaming/mapper.out -mapper hadoopstreaming/mapper.out 
-files hadoopstreaming/reducer.out -reducer hadoopstreaming/reducer.out 
-input /wc -output /wordcount-out

Run the Job
hadoop@namenode2:~$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming-2.5.0-cdh5.3.1.jar -files hadoopstreamingtrail/mapper.out -mapper hadoopstreamingtrail/mapper.out -files hadoopstreaming/reducer.out -reducer hadoopstreaming/reducer.out -input /wc -output /wordcount-out
packageJobJar: [hadoopstreaming/mapper.out, hadoopstreaming/reducer.out] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.5.0-cdh5.3.1.jar] /tmp/streamjob7616955264406618684.jar tmpDir=null
15/05/04 15:50:28 INFO mapred.FileInputFormat: Total input paths to process : 2
15/05/04 15:50:28 INFO mapreduce.JobSubmitter: number of splits:3
15/05/04 15:50:28 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1426753134244_0119
15/05/04 15:50:29 INFO impl.YarnClientImpl: Submitted application application_1426753134244_0119
15/05/04 15:50:29 INFO mapreduce.Job: Running job: job_1426753134244_0119
15/05/04 15:50:37 INFO mapreduce.Job:  map 0% reduce 0%
15/05/04 15:50:46 INFO mapreduce.Job:  map 67% reduce 0%
15/05/04 15:50:47 INFO mapreduce.Job:  map 100% reduce 0%
15/05/04 15:50:53 INFO mapreduce.Job:  map 100% reduce 100%
15/05/04 15:50:55 INFO mapreduce.Job: Job job_1426753134244_0119 completed successfully

4. Lets see the results
hadoop@namenode2:~$ hadoop fs -ls  /wordcount-out
Found 2 items
-rw-r--r--   3 hadoop hadoop          0 2015-05-04 15:50 /wordcount-out/_SUCCESS
-rw-r--r--   3 hadoop hadoop      11685 2015-05-04 15:50 /wordcount-out/part-00000


Happy Hadooping

Wednesday, 21 January 2015

K-Nearest Neighbors Algorithm - KNN

KNN algorithm is a classification algorithm can be used in many application such as image processing,statistical design pattern and data mining.

As for any classification algorithm KN also have a model and Prediction part. Here model is simply the input dataset. While predicting output is a class membership. An object is classified by a majority vote of its neighbors (k), with the object being assigned to the class most common among its k nearest neighbors (k is a positive integer, typically small).
1.  If k = 1, then the object is simply assigned to the class of that single nearest neighbor.
2.  If k=3, and the classlabels are Good =2 Bad=1,then the predicted classlabel will be Good,which contains the magority vote.

Lets see how to handle a sample data in KNN algorithm.


We have data from questionnaires survey and objective testing with two attribute to classify whether a special paper issue is good or not.

Here is for training sample.







Let this be the test sample





1. Determine the parameter k=the no.of nearest neighbours.
      Say  k=3
2. Calculate the distance between queryinstance and all the training samples. 

Coordinate of query instance is (3,7) ,instead of calculating the distance we compute square distance which is faster to calculate(without squareroot)


3. Sort the distance and determine Nearest neighbors based on the kth minimum distance.


4. Gather  the category Y  of the nearest neighbours .



-> the second row inthe last column that the category of nearest neighbours (Y) is not included becoz the rank of this data is more than 3(=k).

5. Use simple majority of the category of  nearest neighbors as the prediction value of query instance.

We have  2 good and 1 bad ,since,2>1 So we conclude that a new paper tissue that pass laboratory test with x1=3 and x2=7 is included in Good category.



Thursday, 1 January 2015

Cloudera Certified Hadoop Developer (CCD - 410)


I cleared Cloudera Certified Hadoop Developer (CCD – 410) examination on December 31 st 2014.And received the certificate from cloudera on the very next day.

If you are planning to do this certification you need to know hadoop in depth and have hands-on experience too.

I started Hadoop career from my MCA (2010 - 2013) Final Year Major Project and it paved me to my current Job. Around 6 months after joining I planned to write Cloudera Certification Exam. I had around 1+ year experience in hadoop ,learned and practised hadoop myself. I thought it will be nice to attend the training session to see if I missed out any of the pointers. I registered with Cloudera and attended Cloudera Hadoop training @ Banglore.It was from 27 th to 30 th of March 2014 at Ibis Hotel, Banglore.It was my first trip to Banglore and was little bit tensed.

There were 13 trainees including me.And I was the only lady among them. Allan Schweitz was our trainer and Vipin Nahal from OssCube assisted him.

It was 4 days training which helps to know Hadoop Framework in depth , they also cover Hadoop EcoSystem Projects and Hands on assignments.After 4 days we will be able to know hadoop in depth.The 4 days class was really good and informative.If some one knows hadoop it will be like waste of time but still you can clarify your doubts.

At the end of 4 th day we received a training certificate and had a group pic. 



After training we recieved 180-day subscription to Cloudera Official Developer Practice Test Subscription for CCDH.This self-assessment will help you to discover strengths and weaknesses in your understanding and skills around Apache Hadoop and prepares you across the entire range of topics covered in a Cloudera certification exam.

Finally by 31 st of December 2014 I appeared for Hadoop Certification exam and cleared CCD 410 successfully.

There were around 52 questions in total and all options were easy and the answers seems to be similar and tricky. Here is my Certificate.



Advices to pass the certification examination
  1. Please dont depend on Cheating sites , Most of their answers are wrong.View some sample dumps from cheating site.
  2. Go through Hadoop - Definitive guide
  3. Gather a good knowledge in Hadoop EcoSystem projects.
  4. If attending Cloudera Training you will recieve 180 days subscription test as mentioned above.You can practise them.If you are getting an overall grade greater than 75% you will surely pass the examination.
  5. You will also get questions from EcoSystem projects(Hive,sqoop,Flume..) and programming questions related to MapReduce. All of them are output prediction.
Details For Cloudera Certification

1. Exam Code: CCD-410
    Number of Questions: 50 - 55 live questions
    Time Limit: 90 minutes
    Passing Score: 70%
    Language: English, Japanese
    Price: USD $295, AUD $300, EUR €215, GBP £185, JPY ¥28,500

2. You can log on to Cloudera PearsonVue for registering your Certification Test.
    First you need to set up your profile in cloudera pearson vue site.Once you have registered you will see a link to register for the exam and subsequently you can choose date and location. It will then take you to payment options where you need to pay for your certification Exam.

All the very best!!

For further information or queries or doubts regarding Hadoop you can contact me.


Thursday, 11 December 2014

Computing Median In Hive

In statistics and probability theory, the median is the numerical value separating the higher half of a data sample, a population, or a probability distribution, from the lower half.

The median is the central point of a data set.

Consider the following data points: 1,4,5,6,7
The Median is "5".

Lets see how we will find median in Hive.

Consider a "test" table.
-------------------
|Name Age|
------------------
|A  23 |
|B    23 |
|C  20 |
------------------
hive> select * from test;
OK
A 23
B 23
C 20
Time taken: 4.219 seconds, Fetched: 3 row(s)

Lets say we are going to find the median for Age column in "test" table.
Our expected median is "23".

PERCENTILE(BIGINT col,0.5) function helps to compute median in hive.The 50th percentile would be the median.

Structure of  "test" table
hive> desc test;      
OK
firstname            string                                   
age                  int                                      
Time taken: 0.32 seconds, Fetched: 2 row(s)

Here we can see the column we are going to find median is in INT. We need to convert the column into BIGINT.

Lets try out the query
select percentile(cast(age as BIGINT), 0.5) from test; 
Here we casted age column into BIGINT.
hive> select percentile(cast(age as BIGINT), 0.5) from test1; 
Query ID = aibladmin_20141211140606_c61cb042-ed14-4048-8270-4cea1eece1c7 
Total jobs = 1 
Launching Job 1 out of 1 
.
.
OK 
23.0 
Time taken: 27.659 seconds, Fetched: 1 row(s)
23.0 is the expected result which is the median for [23,23,20].