首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

RandomWriter代码诠释

2012-10-31 
RandomWriter代码注释package org.apache.hadoop.examplesimport java.io.IOExceptionimport java.util.

RandomWriter代码注释

package org.apache.hadoop.examples;import java.io.IOException;import java.util.Date;import java.util.Random;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapred.ClusterStatus;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.InputFormat;import org.apache.hadoop.mapred.InputSplit;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.RecordReader;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.SequenceFileOutputFormat;import org.apache.hadoop.mapred.lib.IdentityReducer;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** 程序是hadoop的 map/reducer例子程序,主要功能是生成随机数的二进制文件代码中自定义了inputformat,作为虚拟的mapper文件输入。代码中还用counter统计了一些状态。 * This program uses map/reduce to just run a distributed job where there is * no interaction between the tasks and each task write a large unsorted * random binary sequence file of BytesWritable. * In order for this program to generate data for terasort with 10-byte keys * and 90-byte values, have the following config: * <xmp> * <?xml version="1.0"?> * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> * <configuration> *   <property> *     <name>test.randomwrite.min_key</name> *     <value>10</value> *   </property> *   <property> *     <name>test.randomwrite.max_key</name> *     <value>10</value> *   </property> *   <property> *     <name>test.randomwrite.min_value</name> *     <value>90</value> *   </property> *   <property> *     <name>test.randomwrite.max_value</name> *     <value>90</value> *   </property> *   <property> *     <name>test.randomwrite.total_bytes</name> *     <value>1099511627776</value> *   </property> * </configuration></xmp> *  * Equivalently, {@link RandomWriter} also supports all the above options * and ones supported by {@link GenericOptionsParser} via the command-line. */public class RandomWriter extends Configured implements Tool {    /**   * User counters   */  static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }    /**自定义的文件输入格式作为虚拟的mapper文件输入,需要实现接口InputFormat两个方法。一个是getSplits,另一个是getRecordReader   * A custom input format that creates virtual inputs of a single string   * for each map.   */  static class RandomInputFormat implements InputFormat<Text, Text> {    /** 返回inputsplit数组,filesplit是inputsplit的一个实现。实例化有四个参数   第一个是文件名,第二个是filesplit开始字节位置,第三个是filesplit字节长度,第4个是filesplit位置信息,host数组的列表     * Generate the requested number of file splits, with the filename     * set to the filename of the output file.     */    public InputSplit[] getSplits(JobConf job,                                   int numSplits) throws IOException {      InputSplit[] result = new InputSplit[numSplits];      Path outDir = FileOutputFormat.getOutputPath(job);      for(int i=0; i < result.length; ++i) {        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,                                   (String[])null);      }      return result;    }    /**嵌套静态类,自定义的recordreader。用于读取分片split     * Return a single record (filename, "") where the filename is taken from     * the file split.     */    static class RandomRecordReader implements RecordReader<Text, Text> {      Path name;      public RandomRecordReader(Path p) {        name = p;      }      public boolean next(Text key, Text value) {        if (name != null) {          key.set(name.getName());          name = null;          return true;        }        return false;      }      public Text createKey() {        return new Text();      }      public Text createValue() {        return new Text();      }      public long getPos() {        return 0;      }      public void close() {}      public float getProgress() {        return 0.0f;      }    }    public RecordReader<Text, Text> getRecordReader(InputSplit split,                                        JobConf job,                                         Reporter reporter) throws IOException {      return new RandomRecordReader(((FileSplit) split).getPath());    }  }/* mapper类*/  static class Map extends MapReduceBase    implements Mapper<WritableComparable, Writable,                      BytesWritable, BytesWritable> {        private long numBytesToWrite; //生成的字节长度总数    private int minKeySize;//最小key大小    private int keySizeRange;//key的大小范围    private int minValueSize;//最小value大小    private int valueSizeRange;//value的大小范围    private Random random = new Random(); //随机数    private BytesWritable randomKey = new BytesWritable();    private BytesWritable randomValue = new BytesWritable();       /* 为每个字节生成一个随机数*/    private void randomizeBytes(byte[] data, int offset, int length) {      for(int i=offset + length - 1; i >= offset; --i) {        data[i] = (byte) random.nextInt(256);      }    }        /**map方法     * Given an output filename, write a bunch of random records to it.     */    public void map(WritableComparable key,                     Writable value,                    OutputCollector<BytesWritable, BytesWritable> output,                     Reporter reporter) throws IOException {      int itemCount = 0;      while (numBytesToWrite > 0) {        int keyLength = minKeySize +           (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);        randomKey.setSize(keyLength);        randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());        int valueLength = minValueSize +          (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);        randomValue.setSize(valueLength);        randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());        output.collect(randomKey, randomValue);//输出随机的key和随机的value        numBytesToWrite -= keyLength + valueLength;        reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);//状态统计        reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);//状态统计        if (++itemCount % 200 == 0) {          reporter.setStatus("wrote record " + itemCount + ". " +                              numBytesToWrite + " bytes left.");        }      }      reporter.setStatus("done with " + itemCount + " records.");    }        /**初始化参数     * Save the values out of the configuaration that we need to write     * the data.     */    @Override    public void configure(JobConf job) {      numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",                                    1*1024*1024*1024);      minKeySize = job.getInt("test.randomwrite.min_key", 10);      keySizeRange =         job.getInt("test.randomwrite.max_key", 1000) - minKeySize;      minValueSize = job.getInt("test.randomwrite.min_value", 0);      valueSizeRange =         job.getInt("test.randomwrite.max_value", 20000) - minValueSize;    }      }    /**driver方法   * This is the main routine for launching a distributed random write job.   * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.   * The reduce doesn't do anything.   *    * @throws IOException    */  public int run(String[] args) throws Exception {        if (args.length == 0) {      System.out.println("Usage: writer <out-dir>");      ToolRunner.printGenericCommandUsage(System.out);      return -1;    }        Path outDir = new Path(args[0]);    JobConf job = new JobConf(getConf());        job.setJarByClass(RandomWriter.class);    job.setJobName("random-writer");    FileOutputFormat.setOutputPath(job, outDir);        job.setOutputKeyClass(BytesWritable.class);    job.setOutputValueClass(BytesWritable.class);        job.setInputFormat(RandomInputFormat.class);//设置输入文件格式类    job.setMapperClass(Map.class);            job.setReducerClass(IdentityReducer.class);    job.setOutputFormat(SequenceFileOutputFormat.class);//设置输出文件格式        JobClient client = new JobClient(job);    ClusterStatus cluster = client.getClusterStatus();    int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 10);    long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",                                             1*1024*1024*1024);    if (numBytesToWritePerMap == 0) {      System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");      return -2;    }    long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes",          numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());    int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);    if (numMaps == 0 && totalBytesToWrite > 0) {      numMaps = 1;      job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);    }        job.setNumMapTasks(numMaps);    System.out.println("Running " + numMaps + " maps.");        // reducer NONE    job.setNumReduceTasks(0); //设置reducer的数目为0        Date startTime = new Date();    System.out.println("Job started: " + startTime);    JobClient.runJob(job);    Date endTime = new Date();    System.out.println("Job ended: " + endTime);    System.out.println("The job took " +                        (endTime.getTime() - startTime.getTime()) /1000 +                        " seconds.");        return 0;  }    public static void main(String[] args) throws Exception {    int res = ToolRunner.run(new Configuration(), new RandomWriter(), args);    System.exit(res);  }}

?

热点排行