首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 数据库 > SQL Server >

Hadoop MapReduce 学习札记(四) MapReduce实现类似SQL的SELECT MAX(ID) 2 一些改进

2012-08-09 
Hadoop MapReduce 学习笔记(四) MapReduce实现类似SQL的SELECT MAX(ID) 2 一些改进? ?本博客属原创文章,转

Hadoop MapReduce 学习笔记(四) MapReduce实现类似SQL的SELECT MAX(ID) 2 一些改进

? ?本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blog/1233723

? ?欢迎加入Hadoop超级群:?180941958

??? 本博客已迁移到本人独立博客:http://www.yun5u.com/articles/hadoop-mapreduce-sql-max-2.html

????? 请先阅读:??????????

?????????? 1.Hadoop MapReduce 学习笔记(一) 序言和准备

??????????????2.Hadoop MapReduce 学习笔记(二) 序言和准备 2

??????????????3.Hadoop MapReduce 学习笔记(三) MapReduce实现类似SQL的SELECT MAX(ID)

?

??? 下一篇:Hadoop MapReduce 学习笔记(五) MapReduce实现类似SQL的max和min

?

????? ?? Hadoop MapReduce 学习笔记(三) MapReduce实现类似SQL的SELECT MAX(ID)只是大概的实现了找出最大值,这里我们要引入Hadoop Job的另一个参数,Combiner.据我现在所掌握的,map后会将结果写入本地文件,然后经过combine,merge等交给reduce.而reduce比较耗性能,需要大量通信(别人说的,待探索).所以尽可能的在map后输出尽可能少的数据交给Reduce.所以Combiner就是做这个用途.比如我这里要找出最大值,每一次map后都有一些数,我们可以对这些数找出最大值,再交给reduce,所以用到了Combiner.这里只是我的理解,还待考证!期待探索源码的这一天!

?

? ? ?贴上代码:

?

package com.guoyun.hadoop.mapreduce.study;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 获得最大的数,类似SQL:SELECT MAX(NUMBER) FROM TABLE * 注意这里只有一列 * 相比 @GetMaxValueMapReduceTest,这里进行了一点小改进,引入了Combiner * 如果想对多列中的某一列或者某几列取最大和最小值请查看 @GetMaxAndMinValueMultiMapReduceTest */public class GetMaxValueMapReduceImproveTest extends MyMapReduceSIngleColumnTest{  public static final Logger log=LoggerFactory.getLogger(GetMaxValueMapReduceImproveTest.class);  public GetMaxValueMapReduceImproveTest(String outputPath) {    super(outputPath);    // TODO Auto-generated constructor stub  }  /**   * Map,to get the source datas   */  public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{    private final Text writeKey=new Text("K");    private LongWritable writeValue=new LongWritable(0);        @Override    protected void map(LongWritable key, Text value, Context context)        throws IOException, InterruptedException {      log.debug("begin to map");      StringTokenizer tokenizer=null;      String lineValue=null;                  tokenizer=new StringTokenizer(value.toString().trim());      while(tokenizer.hasMoreTokens()){        lineValue=tokenizer.nextToken().trim();        if(lineValue.equals("")){          continue;        }        try {          writeValue.set(Long.parseLong(lineValue));          context.write(writeKey, writeValue);        } catch (NumberFormatException e) {          continue;        }              }    }  }    public static class MyCombiner    extends Reducer<Text,LongWritable,Text,LongWritable>{    private final Text maxValueKey=new Text("maxValue");        @Override    public void reduce(Text key, Iterable<LongWritable> values,Context context)        throws IOException, InterruptedException {      log.debug("begin to combine");      long maxValue=Long.MIN_VALUE;      for(LongWritable value:values){        if(value.get()>maxValue){          maxValue=value.get();        }      }      context.write(maxValueKey, new LongWritable(maxValue));    }       }      /**   * Reduce,to get the max value   */  public static class MyReducer     extends Reducer<Text,LongWritable,Text,LongWritable>{    private final Text maxValueKey=new Text("maxValue");            @Override    public void reduce(Text key, Iterable<LongWritable> values,Context context)        throws IOException, InterruptedException {      log.debug("begin to reduce");      long maxValue=Long.MIN_VALUE;      for(LongWritable value:values){        if(value.get()>maxValue){          maxValue=value.get();        }      }      context.write(maxValueKey, new LongWritable(maxValue));    }   }    /**   * @param args   */  public static void main(String[] args) {    MyMapReduceTest mapReduceTest=null;    Configuration conf=null;    Job job=null;    FileSystem fs=null;    Path inputPath=null;    Path outputPath=null;    long begin=0;    String output="testDatas/mapreduce/MROutput_SingleColumn_getMaxImprove";        try {      mapReduceTest=new GetMaxValueMapReduceImproveTest(output);            inputPath=new Path(mapReduceTest.getInputPath());      outputPath=new Path(mapReduceTest.getOutputPath());            conf=new Configuration();      job=new Job(conf,"getMaxValueImprove");            fs=FileSystem.getLocal(conf);      if(fs.exists(outputPath)){        if(!fs.delete(outputPath,true)){          System.err.println("Delete output file:"+mapReduceTest.getOutputPath()+" failed!");          return;        }      }                  job.setJarByClass(GetMaxValueMapReduceImproveTest.class);      job.setMapOutputKeyClass(Text.class);      job.setMapOutputValueClass(LongWritable.class);      job.setOutputKeyClass(Text.class);      job.setOutputValueClass(LongWritable.class);      job.setMapperClass(MyMapper.class);      job.setCombinerClass(MyCombiner.class);      job.setReducerClass(MyReducer.class);            job.setNumReduceTasks(2);            FileInputFormat.addInputPath(job, inputPath);      FileOutputFormat.setOutputPath(job, outputPath);                  begin=System.currentTimeMillis();      job.waitForCompletion(true);            System.out.println("===================================================");      if(mapReduceTest.isGenerateDatas()){        System.out.println("The maxValue is:"+mapReduceTest.getMaxValue());        System.out.println("The minValue is:"+mapReduceTest.getMinValue());      }      System.out.println("Spend time:"+(System.currentTimeMillis()-begin));      // Spend time:11330          } catch (Exception e) {      // TODO Auto-generated catch block      e.printStackTrace();    }      }}

热点排行