1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| package hadoop; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import scala.reflect.generic.Trees.New; public class WordCount { public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{ private static final IntWritable Number = new IntWritable(1); private Text word = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer stringTokenizer = new StringTokenizer(value.toString()); while(stringTokenizer.hasMoreTokens()){ String string = stringTokenizer.nextToken(); word.set(string); context.write(word, Number); } } } public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> vlaues, Context context) throws IOException, InterruptedException { int num=0; for(IntWritable intWritable:vlaues){ num+=intWritable.get(); } context.write(key, new IntWritable(num)); } } public static class WordCountMapper1 extends Mapper<Object, Text, Text, IntWritable>{ private static final IntWritable Number = new IntWritable(1); private Text word = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer stringTokenizer = new StringTokenizer(value.toString()); while(stringTokenizer.hasMoreTokens()){ String string = stringTokenizer.nextToken(); word.set(string); context.write(word, Number); } } } public static class WordCountReduce1 extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> vlaues, Context context) throws IOException, InterruptedException { int num=0; for(IntWritable intWritable:vlaues){ num+=intWritable.get(); } context.write(key, new IntWritable(num)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] argsValues = new GenericOptionsParser(conf, args).getRemainingArgs(); JobControl jobControl = new JobControl("jobcontrol"); Job job = new Job(conf, "word count1"); job.setJarByClass(WordCount.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPaths(job, argsValues[0]); FileOutputFormat.setOutputPath(job, new Path(argsValues[1])); Job job2 = new Job(conf, "word count2"); job2.setJarByClass(WordCount.class); job2.setMapperClass(WordCountMapper1.class); job2.setReducerClass(WordCountReduce1.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPaths(job2, argsValues[1]); FileOutputFormat.setOutputPath(job2, new Path(argsValues[2])); ControlledJob controlledJob = new ControlledJob(conf); controlledJob.setJob(job); ControlledJob controlledJob2 = new ControlledJob(conf); controlledJob2.setJob(job2); controlledJob2.addDependingJob(controlledJob); jobControl.addJob(controlledJob); jobControl.addJob(controlledJob2); Thread thread = new Thread(jobControl); thread.start(); while(true){ if(jobControl.allFinished()){ System.out.println(jobControl.getSuccessfulJobList()); jobControl.stop(); break; } } } }
|