Pages

Saturday, May 17, 2014

Running a MapReduce Job



WordCount is a simple application that counts the number of occurrences of each word in an input set.

1.       Create the input directory in HDFS.
# useradd cloudera
$ sudo su hdfs                                                                                    
$ hadoop fs -mkdir /user/cloudera
$ hadoop fs -chown cloudera /user/cloudera
$ exit
$ sudo su - cloudera
$ pwd
/home/cloudera
$ hadoop fs -mkdir /user/cloudera/wordcount /user/cloudera/wordcount/input

2.       Create sample text files and copy the files into HDFS under the input directory.
$ echo "Hello World Bye World" > file0
$ echo "Hello Hadoop Goodbye Hadoop" > file1
$ hadoop fs -put file* /user/cloudera/wordcount/input

3.       Create a java program.
$ vi WordCount.java

package org.myorg;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount {

public static class Map extends MapReduceBase implements Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}

public static class Reduce extends MapReduceBase implements Reducer {
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));

JobClient.runJob(conf);
}
}

4.       Compile WordCount.java.
$ mkdir wordcount_classes
$ javac -cp /opt/cloudera/parcels/CDH/lib/hadoop/*:/opt/cloudera/parcels/CDH/lib/hadoop/client-0.20/* -d wordcount_classes WordCount.java

5.       Create a JAR.
$ jar -cvf wordcount.jar -C wordcount_classes/ .
added manifest
adding: org/(in = 0) (out= 0)(stored 0%)
adding: org/myorg/(in = 0) (out= 0)(stored 0%)
adding: org/myorg/WordCount$Map.class(in = 1938) (out= 798)(deflated 58%)
adding: org/myorg/WordCount$Reduce.class(in = 1611) (out= 649)(deflated 59%)
adding: org/myorg/WordCount.class(in = 1546) (out= 749)(deflated 51%)

6.       Run the application.
$ hadoop jar wordcount.jar org.myorg.WordCount /user/cloudera/wordcount/input /user/cloudera/wordcount/output
14/02/22 19:36:52 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
14/02/22 19:36:53 INFO mapred.FileInputFormat: Total input paths to process : 2
14/02/22 19:36:58 INFO mapred.JobClient: Running job: job_201402221622_0001
14/02/22 19:37:00 INFO mapred.JobClient:  map 0% reduce 0%
14/02/22 19:39:07 INFO mapred.JobClient:  map 33% reduce 0%
14/02/22 19:39:31 INFO mapred.JobClient:  map 67% reduce 0%
14/02/22 19:39:32 INFO mapred.JobClient:  map 100% reduce 0%
14/02/22 19:39:43 INFO mapred.JobClient:  map 100% reduce 100%
14/02/22 19:39:50 INFO mapred.JobClient: Job complete: job_201402221622_0001
14/02/22 19:39:50 INFO mapred.JobClient: Counters: 33
14/02/22 19:39:51 INFO mapred.JobClient:   File System Counters
14/02/22 19:39:51 INFO mapred.JobClient:     FILE: Number of bytes read=79
14/02/22 19:39:51 INFO mapred.JobClient:     FILE: Number of bytes written=651887
14/02/22 19:39:51 INFO mapred.JobClient:     FILE: Number of read operations=0
14/02/22 19:39:51 INFO mapred.JobClient:     FILE: Number of large read operations=0
14/02/22 19:39:51 INFO mapred.JobClient:     FILE: Number of write operations=0
14/02/22 19:39:51 INFO mapred.JobClient:     HDFS: Number of bytes read=413
14/02/22 19:39:51 INFO mapred.JobClient:     HDFS: Number of bytes written=41
14/02/22 19:39:51 INFO mapred.JobClient:     HDFS: Number of read operations=7
14/02/22 19:39:51 INFO mapred.JobClient:     HDFS: Number of large read operations=0
14/02/22 19:39:51 INFO mapred.JobClient:     HDFS: Number of write operations=2
14/02/22 19:39:51 INFO mapred.JobClient:   Job Counters
14/02/22 19:39:51 INFO mapred.JobClient:     Launched map tasks=3
14/02/22 19:39:51 INFO mapred.JobClient:     Launched reduce tasks=1
14/02/22 19:39:51 INFO mapred.JobClient:     Data-local map tasks=3
14/02/22 19:39:51 INFO mapred.JobClient:     Total time spent by all maps in occupied slots (ms)=210815
14/02/22 19:39:51 INFO mapred.JobClient:     Total time spent by all reduces in occupied slots (ms)=10176
14/02/22 19:39:51 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
14/02/22 19:39:51 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
14/02/22 19:39:51 INFO mapred.JobClient:   Map-Reduce Framework
14/02/22 19:39:51 INFO mapred.JobClient:     Map input records=2
14/02/22 19:39:51 INFO mapred.JobClient:     Map output records=8
14/02/22 19:39:51 INFO mapred.JobClient:     Map output bytes=82
14/02/22 19:39:51 INFO mapred.JobClient:     Input split bytes=360
14/02/22 19:39:51 INFO mapred.JobClient:     Combine input records=8
14/02/22 19:39:51 INFO mapred.JobClient:     Combine output records=6
14/02/22 19:39:51 INFO mapred.JobClient:     Reduce input groups=5
14/02/22 19:39:51 INFO mapred.JobClient:     Reduce shuffle bytes=117
14/02/22 19:39:51 INFO mapred.JobClient:     Reduce input records=6
14/02/22 19:39:51 INFO mapred.JobClient:     Reduce output records=5
14/02/22 19:39:51 INFO mapred.JobClient:     Spilled Records=12
14/02/22 19:39:51 INFO mapred.JobClient:     CPU time spent (ms)=2630
14/02/22 19:39:51 INFO mapred.JobClient:     Physical memory (bytes) snapshot=566894592
14/02/22 19:39:51 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=2479079424
14/02/22 19:39:51 INFO mapred.JobClient:     Total committed heap usage (bytes)=280698880
14/02/22 19:39:51 INFO mapred.JobClient:   org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
14/02/22 19:39:51 INFO mapred.JobClient:     BYTES_READ=50

7.       View the results of running job by selecting Activities > mapreduce1 Jobs.

8.       Examine the output.
$  hadoop fs -cat /user/cloudera/wordcount/output/part-00000
Bye                         1
Goodbye 1
Hadoop   2
Hello        2
World      2

9.       Remove the output directory so that you can run the sample again.   
$ hadoop fs -rm -r /user/cloudera/wordcount/output
Moved: 'hdfs://myhost2.example.com:8020/user/cloudera/wordcount/output' to trash at: hdfs://myhost2.example.com:8020/user/hdfs/.Trash/Current

No comments:

Post a Comment