首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

SecondarySort代码的诠释

2012-11-20 
SecondarySort代码的注释package org.apache.hadoop.examples import java.io.DataInput import java.io

SecondarySort代码的注释

package org.apache.hadoop.examples; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; /** * hadoop的map/reduce自带的例子代码,目的是演示二次排序, * 输入是文本文件,文本的每行是两个用空格分隔的整数,程序的输出结果是 * 按前一个整数排序,然后按后一个整数排序。应用场景是:数据join的reduce * 端的算法。 * * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort *            in-dir out-dir */ public class SecondarySort {   /**    * 自定义map的输出key类型,而不是默认的Text类型,他必须实现接口comparable ,作用是输出中对key自定义排序(倒序),实现中必须提供数据的读写方法(readFields,write)。等于(equals)和比较方法(compareTo)。    *    */   public static class IntPair                       implements WritableComparable {     private int first = 0;     private int second = 0;         /**      * Set the left and right values.      */     public void set(int left, int right) {       first = left;       second = right;     }     public int getFirst() {       return first;     }     public int getSecond() {       return second;     }     /**      * 按编码读两个整数,编码是:读第一个整数+ Integer.MIN_VALUE+读第二个整数 +  Integer.MIN_VALUE      * Encoded as: MIN_VALUE -&gt; 0, 0 -&gt; -MIN_VALUE, MAX_VALUE-&gt; -1      */     @Override     public void readFields(DataInput in) throws IOException {       first = in.readInt() + Integer.MIN_VALUE;       second = in.readInt() + Integer.MIN_VALUE;     }    /*按编码写两个整数。编码是:写第一个整数-Integer.MIN_VALUE +写第二个整数 -  Integer.MIN_VALUE */       @Override     public void write(DataOutput out) throws IOException {       out.writeInt(first - Integer.MIN_VALUE);       out.writeInt(second - Integer.MIN_VALUE);     }   /*哈希方法,估计用于对象比较 */    @Override     public int hashCode() {       return first * 157 + second;     } /*等于方法,和compareTo方法用于排序,而且两者必须一致,不然,排序的结果可能不对。 */    @Override     public boolean equals(Object right) {       if (right instanceof IntPair) {         IntPair r = (IntPair) right;         return r.first == first &amp;&amp; r.second == second;       } else {         return false;       }     }     /** 内置的静态类,用于对象的排序比较。如果没有这个类,排序的话    用 IntPair类自身的等于和比较函数来排序*/     public static class Comparator extends WritableComparator {       public Comparator() {         super(IntPair.class);       }      /*排序方法,b1第一个对象的字节数组第一个元素,s1首个字节的长度,     l1整个字节数组的长度,     b2第二个对象的字节数组第一个元素,s2首个字节的长度,     l2整个字节数组的长度, */      public int compare(byte[] b1, int s1, int l1,                          byte[] b2, int s2, int l2) {         return compareBytes(b1, s1, l1, b2, s2, l2);//比较对象的字节数组,遇到第一个不同立即返回       }     }     static {                                        // 登记比较器comparator,对象     IntPair按比较器的定义排序       WritableComparator.define(IntPair.class, new Comparator());     } /*对象的比较方法 */    @Override      public int compareTo(IntPair o) {      if (first != o.first) {        return first < o.first ? -1 : 1;      } else if (second != o.second) {        return second < o.second ? -1 : 1;      } else {        return 0;      }    }  } /**    * pair对象的Partitioner方法。默认是hashpartition方法。    目的是按前一个整数(firest)来决定让那个reducer来处理   */   public static class FirstPartitioner extends Partitioner{     @Override     public int getPartition(IntPair key, IntWritable value,                             int numPartitions) {       return Math.abs(key.getFirst() * 127) % numPartitions;     }   }   /**    *  reducer的输出按第一个整数来输出,自定义的比较器   */   public static class FirstGroupingComparator                 implements RawComparator { /*比较字节数组,注释掉该函数会报错 */    @Override     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {       return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,                                              b2, s2, Integer.SIZE/8);     } /*作用不明,注释掉改函数,不会报错。 */   @Override    public int compare(IntPair o1, IntPair o2) {      int l = o1.getFirst();      int r = o2.getFirst();      return l == r ? 0 : (l < r ? -1 : 1);    }  }/**mapper类,输出的key是自定义的IntPair    * Read two integers from each line and generate a key, value pair    * as ((left, right), right).    */   public static class MapClass          extends Mapper {         private final IntPair key = new IntPair();     private final IntWritable value = new IntWritable();         @Override     public void map(LongWritable inKey, Text inValue,                     Context context) throws IOException, InterruptedException {       StringTokenizer itr = new StringTokenizer(inValue.toString());       int left = 0;       int right = 0;       if (itr.hasMoreTokens()) {         left = Integer.parseInt(itr.nextToken());         if (itr.hasMoreTokens()) {           right = Integer.parseInt(itr.nextToken());         }         key.set(left, right);         value.set(right);         context.write(key, value);       }     }   }     /**reducer类    * A reducer class that just emits the sum of the input values.    */   public static class Reduce          extends Reducer {     private static final Text SEPARATOR =       new Text(&quot;------------------------------------------------&quot;);     private final Text first = new Text();         @Override     public void reduce(IntPair key, Iterable values,                        Context context                        ) throws IOException, InterruptedException {       context.write(SEPARATOR, null);       first.set(Integer.toString(key.getFirst()));       for(IntWritable value: values) {         context.write(first, value);       }     }   }     public static void main(String[] args) throws Exception {     Configuration conf = new Configuration();     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();     if (otherArgs.length != 2) {       System.err.println(&quot;Usage: secondarysrot &quot;);       System.exit(2);     }      Job job = new Job(conf, "secondary sort");//生成job对象     job.setJarByClass(SecondarySort.class);//指定jar包的名字     job.setMapperClass(MapClass.class);//设置mapper类     job.setReducerClass(Reduce.class);//设置reducer类     // group and partition by the first int in the pair     job.setPartitionerClass(FirstPartitioner.class);//设置自定义的partition类     job.setGroupingComparatorClass(FirstGroupingComparator.class);     //设置分组比较器。(按前一个整数来输出)     // the map output is IntPair, IntWritable     job.setMapOutputKeyClass(IntPair.class);//设置mapper输出的key类型     job.setMapOutputValueClass(IntWritable.class);//设置mapper输出的value类型     // the reduce output is Text, IntWritable     job.setOutputKeyClass(Text.class); //设置reducer输出的key的类型     job.setOutputValueClass(IntWritable.class);//设置reuducer输出的value的类型         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//设置输入路径     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//设置输出路径     System.exit(job.waitForCompletion(true) ? 0 : 1);   } }   

?

热点排行