首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 数据库 > SQL Server >

Hadoop MapReduce 学习札记(七) MapReduce在多字段/列基础上实现类似SQL的max和min

2012-07-15 
Hadoop MapReduce 学习笔记(七) MapReduce在多字段/列基础上实现类似SQL的max和min? ?本博客属原创文章,转

Hadoop MapReduce 学习笔记(七) MapReduce在多字段/列基础上实现类似SQL的max和min

? ?本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blog/1233733

? ?欢迎加入Hadoop超级群:?180941958

?????? 本博客已迁移到本人独立博客:http://www.yun5u.com/articles/hadoop-mapreduce-sql-multi-max-min.html

?????? 请先阅读:??????????

?????????? 1.Hadoop MapReduce 学习笔记(一) 序言和准备

?????????? 2.Hadoop MapReduce 学习笔记(二) 序言和准备 2

?????????? 3.Hadoop MapReduce 学习笔记(三) MapReduce实现类似SQL的SELECT MAX(ID)

?????????? 4.Hadoop MapReduce 学习笔记(四) MapReduce实现类似SQL的SELECT MAX(ID) 2 一些改进

???????????????? 5.Hadoop MapReduce 学习笔记(五) MapReduce实现类似SQL的max和min

???????????????? 6.Hadoop MapReduce 学习笔记(六) MapReduce实现类似SQL的max和min? 正确写法

?

??? 下一篇:Hadoop MapReduce 学习笔记(八) MapReduce实现类似SQL的order by/排序

?

??????? Hadoop MapReduce 学习笔记(六) MapReduce实现类似SQL的max和min? 正确写法只是一列,如序言说的,一张表中有多个列呢?比如想找出序言中USER表最大和最小ID的用户数据,类似SQL:

?SELECT * FROM USER WHERE ID=MAX(ID) OR ID= MIN(ID);

? ? ? 还是贴上代码吧,这里引入的概念是自己实现Hadoop的输入输出.Hadoop自己的是IntWritalbe,Text等,有如Java的int,String.但我们想实现自己的类呢.请看代码吧:


? ? ?1.相对Hadoop来说,自己的输入输出类:

package com.guoyun.hadoop.mapreduce.study;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/** * 多列数据,这里格式是:frameworkName(String)  number(int) * 等同于数据表 * CREATE TABLE TABLE_NAME( *  FRAMEWORK_NAME VARCHAR(32), *  NUMBER INT * ) */public  class MultiColumnWritable implements  WritableComparable{  protected String frameworkName="";  protected long number=-1;    public String getFrameworkName() {    return frameworkName;  }  public void setFrameworkName(String frameworkName) {    this.frameworkName = frameworkName;  }  public long getNumber() {    return number;  }  public void setNumber(long number) {    this.number = number;  }  public MultiColumnWritable() {    super();  }  @Override  public int compareTo(Object obj) {    int result=-1;    if(obj instanceof MultiColumnWritable){      MultiColumnWritable mcw=(MultiColumnWritable)obj;      if(mcw.getNumber()<this.getNumber()){        result =1;      }else if(mcw.getNumber()==this.getNumber()){        result=0;      }    }    return result;  }  @Override  public void readFields(DataInput in) throws IOException {    frameworkName=in.readUTF();    number=in.readLong();  }  @Override  public void write(DataOutput out) throws IOException {    out.writeUTF(frameworkName);    out.writeLong(number);  }  @Override  public String toString() {    return frameworkName+"\t"+number;  }  }

?

? 2.获得最大和最小值

package com.guoyun.hadoop.mapreduce.study;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 或得最大和最小值,类似SQL:SELECT * FROM TABLE WHERE NUMBER=MAX(NUMBER) OR NUMBER=MIN(NUMBER) * 这里有多列数据,但只取其中一列的最大和最小 * 如果想对其中几列取最大最小值,请自己实现 @MultiColumnWritable */public class GetMaxAndMinValueMultiMapReduceTest extends MyMapReduceMultiColumnTest {    public static final Logger log=LoggerFactory.getLogger(GetMaxAndMinValueMultiMapReduceTest.class);  public GetMaxAndMinValueMultiMapReduceTest(long dataLength) throws Exception {    super(dataLength);    // TODO Auto-generated constructor stub  }  public GetMaxAndMinValueMultiMapReduceTest(String outputPath) throws Exception {    super(outputPath);    // TODO Auto-generated constructor stub  }  public GetMaxAndMinValueMultiMapReduceTest(long dataLength, String inputPath,      String outputPath) throws Exception {    super(dataLength, inputPath, outputPath);    // TODO Auto-generated constructor stub  }      public static class MyCombiner    extends Reducer<Text,MultiColumnWritable,Text,MultiColumnWritable>{    private final Text maxValueKey=new Text("maxValue");    private final Text minValueKey=new Text("minValue");        @Override    public void reduce(Text key, Iterable<MultiColumnWritable> values,Context context)        throws IOException, InterruptedException {      log.debug("begin to combine");      long maxValue=Long.MIN_VALUE;      String maxFrameworkName="";      long minValue=Long.MAX_VALUE;      String minFrameworkName="";      long valueTmp=0;      String nameTmp="";      MultiColumnWritable writeValue=new MultiColumnWritable();            for(MultiColumnWritable value:values){        valueTmp=value.getNumber();        nameTmp=value.getFrameworkName();                // 其实可以用他们的compare方法        if(valueTmp>maxValue){          maxValue=valueTmp;          maxFrameworkName=nameTmp;        }else if(valueTmp<minValue){          minValue=valueTmp;          minFrameworkName=nameTmp;        }      }            writeValue.setFrameworkName(maxFrameworkName);      writeValue.setNumber(maxValue);      context.write(maxValueKey, writeValue);      writeValue.setFrameworkName(minFrameworkName);      writeValue.setNumber(minValue);      context.write(minValueKey, writeValue);    }       }      /**   * Reduce,to get the max value   */  public static class MyReducer     extends Reducer<Text,MultiColumnWritable,Text,MultiColumnWritable>{    private final Text maxValueKey=new Text("maxValue");    private final Text minValueKey=new Text("minValue");          @Override    public void run(Context context) throws IOException, InterruptedException {      long maxValue=Long.MIN_VALUE;      long minValue=Long.MAX_VALUE;      long tmpValue=0;      String tmpFrameworkName="";      String tmpKey="";      String maxFrameworkName="";      String minFrameworkName="";      MultiColumnWritable writeValue=new MultiColumnWritable();       MultiColumnWritable tmpWrite=null;            try {        setup(context);           while(context.nextKey()){          tmpKey=context.getCurrentKey().toString();          tmpWrite=(MultiColumnWritable)context.getCurrentValue();          tmpValue=tmpWrite.getNumber();          tmpFrameworkName=tmpWrite.getFrameworkName();                    if(tmpKey.equals("maxValue")){            if(tmpValue>maxValue){              maxValue=tmpValue;              maxFrameworkName=tmpFrameworkName;            }          }else if(tmpKey.equals("minValue")){            if(tmpValue<minValue){              minValue=tmpValue;              minFrameworkName=tmpFrameworkName;            }          }        }                writeValue.setFrameworkName(maxFrameworkName);        writeValue.setNumber(maxValue);        context.write(maxValueKey, writeValue);        writeValue.setFrameworkName(minFrameworkName);        writeValue.setNumber(minValue);        context.write(minValueKey, writeValue);      } catch (Exception e) {        log.debug(e.getMessage());      }finally{        cleanup(context);      }           }    @Override    protected void cleanup(Context context) throws IOException,        InterruptedException {      // TODO Auto-generated method stub      super.cleanup(context);    }    @Override    protected void setup(Context context) throws IOException,        InterruptedException {      // TODO Auto-generated method stub      super.setup(context);    }      }    /**   * @param args   */  public static void main(String[] args) {    MyMapReduceTest mapReduceTest=null;    Configuration conf=null;    Job job=null;    FileSystem fs=null;    Path inputPath=null;    Path outputPath=null;    long begin=0;    String input="testDatas/mapreduce/MRInput_Multi_getMaxAndMin";    String output="testDatas/mapreduce/MROutput_Multi_getMaxAndMin";            try {      mapReduceTest=new GetMaxAndMinValueMultiMapReduceTest(2000000,input,output);            inputPath=new Path(mapReduceTest.getInputPath());      outputPath=new Path(mapReduceTest.getOutputPath());            conf=new Configuration();      job=new Job(conf,"getMaxAndMinValueMulti");            fs=FileSystem.getLocal(conf);      if(fs.exists(outputPath)){        if(!fs.delete(outputPath,true)){          System.err.println("Delete output file:"+mapReduceTest.getOutputPath()+" failed!");          return;        }      }                  job.setJarByClass(GetMaxAndMinValueMultiMapReduceTest.class);      job.setMapOutputKeyClass(Text.class);      job.setMapOutputValueClass(MultiColumnWritable.class);      job.setOutputKeyClass(Text.class);      job.setOutputValueClass(MultiColumnWritable.class);      job.setMapperClass(MultiSupMapper.class);      job.setCombinerClass(MyCombiner.class);      job.setReducerClass(MyReducer.class);            job.setNumReduceTasks(2);            FileInputFormat.addInputPath(job, inputPath);      FileOutputFormat.setOutputPath(job, outputPath);            begin=System.currentTimeMillis();      job.waitForCompletion(true);            System.out.println("===================================================");      if(mapReduceTest.isGenerateDatas()){        System.out.println("The maxValue is:"+mapReduceTest.getMaxValue());        System.out.println("The minValue is:"+mapReduceTest.getMinValue());      }      System.out.println("Spend time:"+(System.currentTimeMillis()-begin));      // Spend time:13361          } catch (Exception e) {      // TODO Auto-generated catch block      e.printStackTrace();    }      }  }

热点排行
Bad Request.