Changeset 17

Show
Ignore:
Timestamp:
04/21/10 22:17:25 (2 years ago)
Author:
ueshin
Message:

新しいAPIに移行。

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/hadoop-aggregate/src/main/java/st/happy_camper/hadoop/aggregate/Aggregator.java

    r11 r17  
    2020import java.text.SimpleDateFormat; 
    2121import java.util.Date; 
    22 import java.util.Iterator; 
    2322import java.util.Locale; 
    2423import java.util.regex.Matcher; 
     
    3029import org.apache.hadoop.io.LongWritable; 
    3130import org.apache.hadoop.io.Text; 
    32 import org.apache.hadoop.mapred.FileInputFormat; 
    33 import org.apache.hadoop.mapred.FileOutputFormat; 
    34 import org.apache.hadoop.mapred.JobClient; 
    35 import org.apache.hadoop.mapred.JobConf; 
    36 import org.apache.hadoop.mapred.MapReduceBase; 
    37 import org.apache.hadoop.mapred.Mapper; 
    38 import org.apache.hadoop.mapred.OutputCollector; 
    39 import org.apache.hadoop.mapred.Reducer; 
    40 import org.apache.hadoop.mapred.Reporter; 
     31import org.apache.hadoop.mapreduce.Job; 
     32import org.apache.hadoop.mapreduce.Mapper; 
     33import org.apache.hadoop.mapreduce.Reducer; 
     34import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
     35import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
    4136import org.apache.hadoop.util.Tool; 
    4237import org.apache.hadoop.util.ToolRunner; 
     
    5045     * @author ueshin 
    5146     */ 
    52     private static class Map extends MapReduceBase implements 
     47    private static class Map extends 
    5348            Mapper<LongWritable, Text, AccessWritable, IntWritable> { 
    5449 
     
    5853                        + "\"GET ((?:/[^ ]*)?/(?:[^/]+\\.html)?) HTTP/1\\.[01]\" (?:200|304) .*$"); 
    5954 
    60         private static final AccessWritable access = new AccessWritable(); 
     55        private final AccessWritable access = new AccessWritable(); 
    6156 
    62         private static final IntWritable one = new IntWritable(1); 
     57        private final IntWritable one = new IntWritable(1); 
    6358 
    6459        /* 
    6560         * (non-Javadoc) 
    66          *  
    67          * @see org.apache.hadoop.mapred.Mapper#map(java.lang.Object, 
    68          * java.lang.Object, org.apache.hadoop.mapred.OutputCollector, 
    69          * org.apache.hadoop.mapred.Reporter) 
     61         * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, 
     62         * org.apache.hadoop.mapreduce.Mapper.Context) 
    7063         */ 
    7164        @Override 
    72         public void map(LongWritable key, Text value, 
    73                 OutputCollector<AccessWritable, IntWritable> output, 
    74                 Reporter reporter) throws IOException { 
     65        protected void map(LongWritable key, Text value, Context context) 
     66                throws IOException, InterruptedException { 
    7567            String line = value.toString(); 
    7668            Matcher matcher = PATTERN.matcher(line); 
     
    9082                    return; 
    9183                } 
    92                 output.collect(access, one); 
     84                context.write(access, one); 
    9385            } 
    9486        } 
     
    9890     * @author ueshin 
    9991     */ 
    100     private static class Combine extends MapReduceBase implements 
     92    private static class Combine extends 
    10193            Reducer<AccessWritable, IntWritable, AccessWritable, IntWritable> { 
    10294 
    10395        /* 
    10496         * (non-Javadoc) 
    105          *  
    106          * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, 
    107          * java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, 
    108          * org.apache.hadoop.mapred.Reporter) 
     97         * @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, 
     98         * java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context) 
    10999         */ 
    110100        @Override 
    111         public void reduce(AccessWritable key, Iterator<IntWritable> values, 
    112                 OutputCollector<AccessWritable, IntWritable> output, 
    113                 Reporter reporter) throws IOException { 
     101        public void reduce(AccessWritable key, Iterable<IntWritable> values, 
     102                Context context) throws IOException, InterruptedException { 
    114103            int sum = 0; 
    115             while(values.hasNext()) { 
    116                 sum += values.next().get(); 
     104            for(IntWritable value : values) { 
     105                sum += value.get(); 
    117106            } 
    118             output.collect(key, new IntWritable(sum)); 
     107            context.write(key, new IntWritable(sum)); 
    119108        } 
    120109 
     
    124113     * @author ueshin 
    125114     */ 
    126     private static class Reduce extends MapReduceBase implements 
     115    private static class Reduce extends 
    127116            Reducer<AccessWritable, IntWritable, Text, IntWritable> { 
    128117 
    129118        /* 
    130119         * (non-Javadoc) 
    131          *  
    132120         * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, 
    133121         * java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, 
     
    135123         */ 
    136124        @Override 
    137         public void reduce(AccessWritable key, Iterator<IntWritable> values, 
    138                 OutputCollector<Text, IntWritable> output, Reporter reporter) 
    139                 throws IOException { 
     125        public void reduce(AccessWritable key, Iterable<IntWritable> values, 
     126                Context context) throws IOException, InterruptedException { 
    140127            int sum = 0; 
    141             while(values.hasNext()) { 
    142                 sum += values.next().get(); 
     128            for(IntWritable value : values) { 
     129                sum += value.get(); 
    143130            } 
    144             output.collect(new Text(String.format("%s\t%s\t%s", key.getAccess() 
     131            context.write(new Text(String.format("%s\t%s\t%s", key.getAccess() 
    145132                    .getIp(), key.getAccess().getUrl(), new SimpleDateFormat( 
    146133                    "yyyy/MM/dd").format(key.getAccess().getAccessDate()))), 
     
    151138    /* 
    152139     * (non-Javadoc) 
    153      *  
    154140     * @see org.apache.hadoop.util.Tool#run(java.lang.String[]) 
    155141     */ 
    156142    public int run(String[] args) throws Exception { 
    157         JobConf conf = new JobConf(getConf(), getClass()); 
    158         conf.setJobName("aggregator"); 
     143        Job job = new Job(getConf(), "aggregator"); 
    159144 
    160         FileInputFormat.setInputPaths(conf, args[0]); 
    161         FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
     145        FileInputFormat.setInputPaths(job, args[0]); 
     146        FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    162147 
    163         conf.setMapperClass(Map.class); 
    164         conf.setCombinerClass(Combine.class); 
    165         conf.setReducerClass(Reduce.class); 
     148        job.setMapperClass(Map.class); 
     149        job.setCombinerClass(Combine.class); 
     150        job.setReducerClass(Reduce.class); 
    166151 
    167         conf.setOutputKeyClass(AccessWritable.class); 
    168         conf.setOutputValueClass(IntWritable.class); 
     152        job.setMapOutputKeyClass(AccessWritable.class); 
     153        job.setMapOutputValueClass(IntWritable.class); 
    169154 
    170         JobClient.runJob(conf); 
     155        job.setOutputKeyClass(Text.class); 
     156        job.setOutputValueClass(IntWritable.class); 
     157 
     158        job.waitForCompletion(true); 
     159 
    171160        return 0; 
    172161    }