首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

PiEstimator代码诠释

2012-11-01 
PiEstimator代码注释package org.apache.hadoop.examplesimport java.io.IOExceptionimport java.math.B

PiEstimator代码注释

package org.apache.hadoop.examples;import java.io.IOException;import java.math.BigDecimal;import java.util.Iterator;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BooleanWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.SequenceFile.CompressionType;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.SequenceFileInputFormat;import org.apache.hadoop.mapred.SequenceFileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/**hadoop的map/reduce程序例子程序,演示用准蒙特-卡洛方法估算PI的值。这是欧洲最早计算PI的方法。在一个单位矩形中,内切一个圆。往给矩形内投任意次针,记下针在圆内的次数,和投的总次数。当数据足够多的时候,圆内的次数约等于圆的面积,总次数约等于单位矩形的面积,在园内次数/总次数=园面积/单位矩形面积=(PI/4)/1所以PI大概等于4*(园内次数/总次数) * A Map-reduce program to estimate the value of Pi * using quasi-Monte Carlo method. * * Mapper: *   Generate points in a unit square *   and then count points inside/outside of the inscribed circle of the square. * * Reducer: *   Accumulate points inside/outside results from the mappers. * * Let numTotal = numInside + numOutside. * The fraction numInside/numTotal is a rational approximation of * the value (Area of the circle)/(Area of the square), * where the area of the inscribed circle is Pi/4 * and the area of unit square is 1. * Then, Pi is estimated value to be 4(numInside/numTotal).   */public class PiEstimator extends Configured implements Tool {  /** tmp directory for input/output */  static private final Path TMP_DIR = new Path(      PiEstimator.class.getSimpleName() + "_TMP_3_141592654");    /** 二维哈尔顿序列的类,哈尔顿序列常常用来产生空间点,因为这个序列的数看上去想随机的。可以用任意一个素数做基数,来生成一系列的的序列。比如说以2的基数,产生的哈尔顿序列是:1/2, 1/4, 3/4, 1/8, 5/8, 3/8, 7/8, 1/16, 9/16。  实现的伪代码如下:FUNCTION (index, base)   BEGIN       result = 0;       f = 1 / base;       i = index;       WHILE (i > 0)        BEGIN           result = result + f * (i % base);           i = FLOOR(i / base);           f = f / base;       END       RETURN result;   END 2-dimensional Halton sequence {H(i)},   * where H(i) is a 2-dimensional point and i >= 1 is the index.   * Halton sequence is used to generate sample points for Pi estimation.    */  private static class HaltonSequence {    /** Bases */    static final int[] P = {2, 3};     /** Maximum number of digits allowed */    static final int[] K = {63, 40};     private long index;    private double[] x;    private double[][] q;    private int[][] d;    /** Initialize to H(startindex),     * so the sequence begins with H(startindex+1).     */    HaltonSequence(long startindex) {      index = startindex;      x = new double[K.length];      q = new double[K.length][];      d = new int[K.length][];      for(int i = 0; i < K.length; i++) {        q[i] = new double[K[i]];        d[i] = new int[K[i]];      }      for(int i = 0; i < K.length; i++) {        long k = index;        x[i] = 0;                for(int j = 0; j < K[i]; j++) {          q[i][j] = (j == 0? 1.0: q[i][j-1])/P[i];          d[i][j] = (int)(k % P[i]);          k = (k - d[i][j])/P[i];          x[i] += d[i][j] * q[i][j];        }      }    }    /**   生成下一个随机点 Compute next point.     * Assume the current point is H(index).     * Compute H(index+1).     *      * @return a 2-dimensional point with coordinates in [0,1)^2     */    double[] nextPoint() {      index++;      for(int i = 0; i < K.length; i++) {        for(int j = 0; j < K[i]; j++) {          d[i][j]++;          x[i] += q[i][j];          if (d[i][j] < P[i]) {            break;          }          d[i][j] = 0;          x[i] -= (j == 0? 1.0: q[i][j-1]);        }      }      return x;    }  }  /**mapper类输入是offset从0开始的序列的序号,size 是每个map处理的点的大小输出 true(圆内),数目;false(圆外),数目   * Mapper class for Pi estimation.   * Generate points in a unit square   * and then count points inside/outside of the inscribed circle of the square.   */  public static class PiMapper extends MapReduceBase    implements Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable> {    /** Map method.     * @param offset samples starting from the (offset+1)th sample.     * @param size the number of samples for this map     * @param out output {ture->numInside, false->numOutside}     * @param reporter     */    public void map(LongWritable offset,                    LongWritable size,                    OutputCollector<BooleanWritable, LongWritable> out,                    Reporter reporter) throws IOException {      final HaltonSequence haltonsequence = new HaltonSequence(offset.get());      long numInside = 0L;      long numOutside = 0L;      for(long i = 0; i < size.get(); ) {        //generate points in a unit square        final double[] point = haltonsequence.nextPoint();        //判断点是否在圆内,并且对在圆内情况和圆外情况计数count points inside/outside of the inscribed circle of the square        final double x = point[0] - 0.5;        final double y = point[1] - 0.5;        if (x*x + y*y > 0.25) {          numOutside++;        } else {          numInside++;        }        //report status        i++;        if (i % 1000 == 0) {          reporter.setStatus("Generated " + i + " samples.");        }      }      //output map results      out.collect(new BooleanWritable(true), new LongWritable(numInside));      out.collect(new BooleanWritable(false), new LongWritable(numOutside));    }  }  /**reducer类   * Reducer class for Pi estimation.   * Accumulate points inside/outside results from the mappers.   */  public static class PiReducer extends MapReduceBase    implements Reducer<BooleanWritable, LongWritable, WritableComparable<?>, Writable> {        private long numInside = 0; //公共变量    private long numOutside = 0;//公共变量    private JobConf conf; //configuration for accessing the file system          /**保存job做公共变量,为了方便close方法调用。      Store job configuration. */    @Override    public void configure(JobConf job) {      conf = job;    }    /**统计map的总的圆内数目和园外数目     * Accumulate number of points inside/outside results from the mappers.     * @param isInside Is the points inside?      * @param values An iterator to a list of point counts     * @param output dummy, not used here.     * @param reporter     */    public void reduce(BooleanWritable isInside,                       Iterator<LongWritable> values,                       OutputCollector<WritableComparable<?>, Writable> output,                       Reporter reporter) throws IOException {      if (isInside.get()) {        for(; values.hasNext(); numInside += values.next().get());      } else {        for(; values.hasNext(); numOutside += values.next().get());      }    }    /**job结束,把圆内数目和圆外数目写到一个文件里     * Reduce task done, write output to a file.     */    @Override    public void close() throws IOException {      //write output to a file      Path outDir = new Path(TMP_DIR, "out");      Path outFile = new Path(outDir, "reduce-out");      FileSystem fileSys = FileSystem.get(conf);      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,          outFile, LongWritable.class, LongWritable.class,           CompressionType.NONE);      writer.append(new LongWritable(numInside), new LongWritable(numOutside));      writer.close();    }  }  /**   * Run a map/reduce job for estimating Pi.   *   * @return the estimated value of Pi   */  public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf      ) throws IOException {    //setup job conf    jobConf.setJobName(PiEstimator.class.getSimpleName()); //设置job的名字    jobConf.setInputFormat(SequenceFileInputFormat.class); //设置输入格式二进制格式SequenceFileInputFormat    jobConf.setOutputKeyClass(BooleanWritable.class);//设置map输出key类型    jobConf.setOutputValueClass(LongWritable.class);//设置map输出value类型    jobConf.setOutputFormat(SequenceFileOutputFormat.class); //设置输出文件是二进制类型SequenceFileOutputFormat    jobConf.setMapperClass(PiMapper.class);//设置map类    jobConf.setNumMapTasks(numMaps);//设置map的数目    jobConf.setReducerClass(PiReducer.class);//设置reduce的类    jobConf.setNumReduceTasks(1);//设置只有一个reduce,不然没法做总的数据统计    // turn off speculative execution, because DFS doesn't handle    // multiple writers to the same file.    jobConf.setSpeculativeExecution(false);   //关闭speculative execution属性,因为DFS不能处理多个writers操作同一一个文件    //setup input/output directories建立输入输出目录    final Path inDir = new Path(TMP_DIR, "in");    final Path outDir = new Path(TMP_DIR, "out");    FileInputFormat.setInputPaths(jobConf, inDir);    FileOutputFormat.setOutputPath(jobConf, outDir);    final FileSystem fs = FileSystem.get(jobConf);    if (fs.exists(TMP_DIR)) {      throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)          + " already exists.  Please remove it first.");    }    if (!fs.mkdirs(inDir)) {      throw new IOException("Cannot create input directory " + inDir);    }  /*创建numMaps个文件,文件名是part+ i ,内容之有一个(key,value)对分别是(offset ,size)*/    try {      //generate an input file for each map task      for(int i=0; i < numMaps; ++i) {        final Path file = new Path(inDir, "part"+i);        final LongWritable offset = new LongWritable(i * numPoints);        final LongWritable size = new LongWritable(numPoints);        final SequenceFile.Writer writer = SequenceFile.createWriter(            fs, jobConf, file,            LongWritable.class, LongWritable.class, CompressionType.NONE);        try {          writer.append(offset, size);        } finally {          writer.close();        }        System.out.println("Wrote input for Map #"+i);      }        //start a map/reduce job      System.out.println("Starting Job");      final long startTime = System.currentTimeMillis();      JobClient.runJob(jobConf);      final double duration = (System.currentTimeMillis() - startTime)/1000.0;      System.out.println("Job Finished in " + duration + " seconds");/*从输出结果文件reduce-out中读取结果圆内数目和圆外数目*/      //read outputs      Path inFile = new Path(outDir, "reduce-out");      LongWritable numInside = new LongWritable();      LongWritable numOutside = new LongWritable();      SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);      try {        reader.next(numInside, numOutside);      } finally {        reader.close();      }       //算出PI的值:于4*(园内次数/总次数) compute estimated value      return BigDecimal.valueOf(4).setScale(20)          .multiply(BigDecimal.valueOf(numInside.get()))          .divide(BigDecimal.valueOf(numMaps))          .divide(BigDecimal.valueOf(numPoints));    } finally {      fs.delete(TMP_DIR, true);//删除临时目录    }  }  /**   * Parse arguments and then runs a map/reduce job.   * Print output in standard out.   *    * @return a non-zero if there is an error.  Otherwise, return 0.     */  public int run(String[] args) throws Exception {    if (args.length != 2) {      System.err.println("Usage: "+getClass().getName()+" <nMaps> <nSamples>");      ToolRunner.printGenericCommandUsage(System.err);      return -1;    }        final int nMaps = Integer.parseInt(args[0]);    final long nSamples = Long.parseLong(args[1]);            System.out.println("Number of Maps  = " + nMaps);    System.out.println("Samples per Map = " + nSamples);            final JobConf jobConf = new JobConf(getConf(), getClass());    System.out.println("Estimated value of Pi is "        + estimate(nMaps, nSamples, jobConf));    return 0;  }  /**   * main method for running it as a stand alone command.    */  public static void main(String[] argv) throws Exception {    System.exit(ToolRunner.run(null, new PiEstimator(), argv));  }}
?

热点排行