Monday, May 26, 2014

Step by step instruction how to write MapReduce Program using Java

Step by step instruction how to write and running MapReduce Program using Java with Eclipse IDE on BOSS/Debian OS
configure  eclipse for Hadoop here

1) Open Eclipse, Hope you have already setup Eclipse for Hadoop else follow to setup

2) Change the perspective (window -->open perspective) to MapReduce

3) Create New project ( File --> New --> Project ) chose project type as MapReduce

chose new project as MapReduce
select new project as MapReduce
4) Give Project Name "LogAnalyserSyntaxError", ( if its first time,  then Configure Hadoop Path Choose path upto $HADOOP_HOME. else choose "use default Hadoop" option )

sample error log file, just extracting the error caused by the syntax 
 
2011-02-15 19:28:56 ERROR  : ERROR:  column "education_qual.degree" must appear in the GROUP BY clause or be used in an aggregate function
2011-02-15 19:30:44 ERROR  : ERROR:  server closed the connection unexpectedly
2011-02-15 19:31:13 ERROR  : ERROR:  canceling statement due to user request
2011-02-15 19:31:24 ERROR  : ERROR:  syntax error at or near "order"
 

5) Create Mapper, Reducer and Driver class (for simplicity I'm creating all these classes in single file )


Mapper Class
 
public static class MapSyntax extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
              private final static IntWritable one = new IntWritable(1);
              private Text word = new Text();
              private Text match = new Text("syntax");
        
              public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {
                String line = value.toString();
               StringTokenizer tokenizer = new StringTokenizer(line);
                while (tokenizer.hasMoreTokens()) {
                 word.set(tokenizer.nextToken());
                 if (word.equals(match)){
                  output.collect(word, one);
                 }

                }
             }
            }
 


Above class,
   A) Accept input (Key, Value) as longWritable and Text, produce output (key, value) as Text, IntWritable. 

   B) Find the given string "syntax" from each line and passes these values to reducer class


Reducer Class
 
public static class ReduceSyntax extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
             public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
               int sum = 0;
                while (values.hasNext()) {
                    sum += values.next().get();
                }
                output.collect(key, new IntWritable(sum));
              }
           }
 

Above class,
   A) Accept input (Key, Value) from Mapper class as Text, IntWritable , produce output (key, value) as Text, IntWritable

   B) count the total no.of occurrence, the given error type 'syntax' and produce the result in key, value pair

Driver Class 
 
public static void main(String[] args) throws Exception {
 
              JobConf conf = new JobConf(LogAnalyserSyntaxError.class);
              conf.setJobName("ErrorDetails");
 
//Output type returned from Reducer class        
              conf.setOutputKeyClass(Text.class);
              conf.setOutputValueClass(IntWritable.class);

//Specify the Mapper and Reducer class name       
              conf.setMapperClass(Map
Syntax.class);
              conf.setCombinerClass(Reduce
Syntax.class);
              conf.setReducerClass(Reduce
Syntax.class);
       
              conf.setInputFormat(TextInputFormat.class);
              conf.setOutputFormat(TextOutputFormat.class);
 
//Default location of input  path is : hdfs://localhost:9000/user/root/input 
/// user/HADOOP_USER/input, All files under input dir will be processed        
// output file will be stored in hdfs://localhost:9000/user/root/ouput  
             FileInputFormat.setInputPaths(conf, new Path("input"));
              FileOutputFormat.setOutputPath(conf, new Path("output"));
       
             JobClient.runJob(conf);
     } 
 
 A) make sure you have an input dir under /users/HadoopUser/ (in my case hadoop user is root, /user/root/input and /user/root/output).

B) By default all the files under input dir will be processed

find below the complete class file

 
    import java.io.IOException;
         import java.util.*;        
         import org.apache.hadoop.fs.Path;
         import org.apache.hadoop.conf.*;
         import org.apache.hadoop.io.*;
         import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
        
public class SyntexError {
 
public static class MapSyntax extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
              private final static IntWritable one = new IntWritable(1);
              private Text word = new Text();
              private Text match = new Text("syntax");
        
   public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {
                String line = value.toString();
               StringTokenizer tokenizer = new StringTokenizer(line);
                while (tokenizer.hasMoreTokens()) {
                 word.set(tokenizer.nextToken());
                 if (word.equals(match)){
                  output.collect(word, one);
                 }

                }
             }
            }
 
public static class ReduceSyntax extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
             public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
               int sum = 0;
                while (values.hasNext()) {
                    sum += values.next().get();
                }
                output.collect(key, new IntWritable(sum));
              }
           } 
 
public static void main(String[] args) throws Exception {
 
              JobConf conf = new JobConf(LogAnalyserSyntaxError.class);
              conf.setJobName("ErrorDetails");
 
//Output type returned from Reducer class        
              conf.setOutputKeyClass(Text.class);
              conf.setOutputValueClass(IntWritable.class);

//Specify the Mapper and Reducer class name       
              conf.setMapperClass(Map
Syntax.class);
              conf.setCombinerClass(Reduce
Syntax.class);
              conf.setReducerClass(Reduce
Syntax.class);
       
              conf.setInputFormat(TextInputFormat.class);
              conf.setOutputFormat(TextOutputFormat.class);
 
//Default location of input  path is : hdfs://localhost:9000/user/root/input 
/// user/HADOOP_USER/input, All files under input dir will be processed        
// output file will be stored in hdfs://localhost:9000/user/root/ouput  
             FileInputFormat.setInputPaths(conf, new Path("input"));
              FileOutputFormat.setOutputPath(conf, new Path("output"));
       
             JobClient.runJob(conf);
            } 
}
 


6) Run the project by

Execute MapReduce job

7) Next screen will ask select Hdoop location, you choose to select existing server from list below option and press continue

8) output  will be stored in hdfs://localhost:9000/user/root/output  location.
Post a Comment