Changeset 17
- Timestamp:
- 04/21/10 22:17:25 (2 years ago)
- Files:
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/hadoop-aggregate/src/main/java/st/happy_camper/hadoop/aggregate/Aggregator.java
r11 r17 20 20 import java.text.SimpleDateFormat; 21 21 import java.util.Date; 22 import java.util.Iterator;23 22 import java.util.Locale; 24 23 import java.util.regex.Matcher; … … 30 29 import org.apache.hadoop.io.LongWritable; 31 30 import org.apache.hadoop.io.Text; 32 import org.apache.hadoop.mapred.FileInputFormat; 33 import org.apache.hadoop.mapred.FileOutputFormat; 34 import org.apache.hadoop.mapred.JobClient; 35 import org.apache.hadoop.mapred.JobConf; 36 import org.apache.hadoop.mapred.MapReduceBase; 37 import org.apache.hadoop.mapred.Mapper; 38 import org.apache.hadoop.mapred.OutputCollector; 39 import org.apache.hadoop.mapred.Reducer; 40 import org.apache.hadoop.mapred.Reporter; 31 import org.apache.hadoop.mapreduce.Job; 32 import org.apache.hadoop.mapreduce.Mapper; 33 import org.apache.hadoop.mapreduce.Reducer; 34 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 35 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 41 36 import org.apache.hadoop.util.Tool; 42 37 import org.apache.hadoop.util.ToolRunner; … … 50 45 * @author ueshin 51 46 */ 52 private static class Map extends MapReduceBase implements47 private static class Map extends 53 48 Mapper<LongWritable, Text, AccessWritable, IntWritable> { 54 49 … … 58 53 + "\"GET ((?:/[^ ]*)?/(?:[^/]+\\.html)?) HTTP/1\\.[01]\" (?:200|304) .*$"); 59 54 60 private staticfinal AccessWritable access = new AccessWritable();55 private final AccessWritable access = new AccessWritable(); 61 56 62 private staticfinal IntWritable one = new IntWritable(1);57 private final IntWritable one = new IntWritable(1); 63 58 64 59 /* 65 60 * (non-Javadoc) 66 * 67 * @see org.apache.hadoop.mapred.Mapper#map(java.lang.Object, 68 * java.lang.Object, org.apache.hadoop.mapred.OutputCollector, 69 * org.apache.hadoop.mapred.Reporter) 61 * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, 62 * org.apache.hadoop.mapreduce.Mapper.Context) 70 63 */ 71 64 @Override 72 public void map(LongWritable key, Text value, 73 OutputCollector<AccessWritable, IntWritable> output, 74 Reporter reporter) throws IOException { 65 protected void map(LongWritable key, Text value, Context context) 66 throws IOException, InterruptedException { 75 67 String line = value.toString(); 76 68 Matcher matcher = PATTERN.matcher(line); … … 90 82 return; 91 83 } 92 output.collect(access, one);84 context.write(access, one); 93 85 } 94 86 } … … 98 90 * @author ueshin 99 91 */ 100 private static class Combine extends MapReduceBase implements92 private static class Combine extends 101 93 Reducer<AccessWritable, IntWritable, AccessWritable, IntWritable> { 102 94 103 95 /* 104 96 * (non-Javadoc) 105 * 106 * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, 107 * java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, 108 * org.apache.hadoop.mapred.Reporter) 97 * @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, 98 * java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context) 109 99 */ 110 100 @Override 111 public void reduce(AccessWritable key, Iterator<IntWritable> values, 112 OutputCollector<AccessWritable, IntWritable> output, 113 Reporter reporter) throws IOException { 101 public void reduce(AccessWritable key, Iterable<IntWritable> values, 102 Context context) throws IOException, InterruptedException { 114 103 int sum = 0; 115 while(values.hasNext()) {116 sum += value s.next().get();104 for(IntWritable value : values) { 105 sum += value.get(); 117 106 } 118 output.collect(key, new IntWritable(sum));107 context.write(key, new IntWritable(sum)); 119 108 } 120 109 … … 124 113 * @author ueshin 125 114 */ 126 private static class Reduce extends MapReduceBase implements115 private static class Reduce extends 127 116 Reducer<AccessWritable, IntWritable, Text, IntWritable> { 128 117 129 118 /* 130 119 * (non-Javadoc) 131 *132 120 * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, 133 121 * java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, … … 135 123 */ 136 124 @Override 137 public void reduce(AccessWritable key, Iterator<IntWritable> values, 138 OutputCollector<Text, IntWritable> output, Reporter reporter) 139 throws IOException { 125 public void reduce(AccessWritable key, Iterable<IntWritable> values, 126 Context context) throws IOException, InterruptedException { 140 127 int sum = 0; 141 while(values.hasNext()) {142 sum += value s.next().get();128 for(IntWritable value : values) { 129 sum += value.get(); 143 130 } 144 output.collect(new Text(String.format("%s\t%s\t%s", key.getAccess()131 context.write(new Text(String.format("%s\t%s\t%s", key.getAccess() 145 132 .getIp(), key.getAccess().getUrl(), new SimpleDateFormat( 146 133 "yyyy/MM/dd").format(key.getAccess().getAccessDate()))), … … 151 138 /* 152 139 * (non-Javadoc) 153 *154 140 * @see org.apache.hadoop.util.Tool#run(java.lang.String[]) 155 141 */ 156 142 public int run(String[] args) throws Exception { 157 JobConf conf = new JobConf(getConf(), getClass()); 158 conf.setJobName("aggregator"); 143 Job job = new Job(getConf(), "aggregator"); 159 144 160 FileInputFormat.setInputPaths( conf, args[0]);161 FileOutputFormat.setOutputPath( conf, new Path(args[1]));145 FileInputFormat.setInputPaths(job, args[0]); 146 FileOutputFormat.setOutputPath(job, new Path(args[1])); 162 147 163 conf.setMapperClass(Map.class);164 conf.setCombinerClass(Combine.class);165 conf.setReducerClass(Reduce.class);148 job.setMapperClass(Map.class); 149 job.setCombinerClass(Combine.class); 150 job.setReducerClass(Reduce.class); 166 151 167 conf.setOutputKeyClass(AccessWritable.class);168 conf.setOutputValueClass(IntWritable.class);152 job.setMapOutputKeyClass(AccessWritable.class); 153 job.setMapOutputValueClass(IntWritable.class); 169 154 170 JobClient.runJob(conf); 155 job.setOutputKeyClass(Text.class); 156 job.setOutputValueClass(IntWritable.class); 157 158 job.waitForCompletion(true); 159 171 160 return 0; 172 161 }
