LAB: Map Reduce Function-Average
- Dataset: Stock_Price_Data/stock_price.txt
- The dataset contains stock price data collected for every minute.
- Find the average stock price.
Solution
Is the Hadoop started?
jps
Start Hadoop if not started already
start-all.sh
you can also use start-dfs.sh
Is the Hadoop started now? jps
check your files on hdfs
hadoop fs -ls /
Bring the data onto hadoop HDFS
hadoop fs -copyFromLocal /home/hduser/datasets/Stock_Price_Data/stock_price.txt /stock_price
Check the data file on HDFS
hadoop fs -ls /
check your current working directory
cd
Goto hadoop bin
cd /usr/local/hadoop/bin/
It is imporatant to make your PWD(present working directory) as $hadoop/bin
Open an editor with a file name AvgMapred.java
sudo gedit AvgMapred.java
Copy the below java code, paste in your file and save your file
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AvgMapred {
/*
* data schema(tab separated) : 2000.16325 4664654.78955 46513123.134165
*/
public static class MapperClass extends
Mapper<LongWritable, Text, Text, FloatWritable> {
public void map(LongWritable key, Text empRecord, Context con)
throws IOException, InterruptedException {
String[] word = empRecord.toString().split("\\n");
String flg = " ";
try {
Float salary = Float.parseFloat(word[0]);
con.write(new Text(flg), new FloatWritable(salary));
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static class ReducerClass extends
Reducer<Text, FloatWritable, Text, Text> {
public void reduce(Text key, Iterable<FloatWritable> valueList,
Context con) throws IOException, InterruptedException {
try {
Double total = (double) 0;
int count = 0;
for (FloatWritable var : valueList) {
total += var.get();
//System.out.println("reducer " + var.get());
count++;
}
Double avg = (double) total / count;
String out = "Total: " + total + " :: " + "Average: " + avg;
con.write(key, new Text(out));
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
Job job = Job.getInstance(conf, "FindAverageAndTotalSalary");
job.setJarByClass(AvgMapred.class);
job.setMapperClass(MapperClass.class);
job.setReducerClass(ReducerClass.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
To compile this program, use the below command
hadoop com.sun.tools.javac.Main AvgMapred.java
Create the jar file which is named as avg.jar
jar cf avg.jar AvgMapred*.class
Run average program, output will be automaically routed to
hadoop jar avg.jar AvgMapred /stock_price /usr/stock_price_out1
Part of the output from above line in Terminal looks like below :
Have a look at the output
hadoop fs -cat /usr/stock_price_out1/part-r-00000
We can take the output to a text file
hadoop fs -cat /usr/stock_price_out1/part-r-00000 >> /home/hduser/Output/stock_price_out.txt