首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 数据库 > SQL Server >

Hadoop MapReduce 学习札记(八) MapReduce实现类似SQL的order by/排序

2012-06-26 
Hadoop MapReduce 学习笔记(八) MapReduce实现类似SQL的order by/排序? ? 本博客属原创文章,转载请注明出

Hadoop MapReduce 学习笔记(八) MapReduce实现类似SQL的order by/排序

? ? 本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blog/1235945

?????? 本博客已迁移到本人独立博客:http://www.yun5u.com/articles/hadoop-mapreduce-sql-order-by-sort.html

?

?????? 请先阅读:??????????

?????????? 1.Hadoop MapReduce 学习笔记(一) 序言和准备

?????????? 2.Hadoop MapReduce 学习笔记(二) 序言和准备 2

?

??? 下一篇:Hadoop MapReduce 学习笔记(九) MapReduce实现类似SQL的order by/排序 正确写法

?

??? 排序是很重要的一个环节,类似SQL中的SELECT * FROM TABLE ORDER BY ID,如何用MapReduce实现呢?

?

package com.guoyun.hadoop.mapreduce.study;import java.io.IOException;import java.util.PriorityQueue;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 对多列数据中某一列进行从小到大排序,然后返回所有数据,类似SQL: * SELECT * FROM TABLE ORDER BY ID ASC; * 但这是一个错误的写法,会内存溢出,避免内存溢出请查看:@OrderByMapReduceFixTest */public class OrderBySingleMapReduceTest extends MyMapReduceMultiColumnTest {    public static final Logger log=LoggerFactory.getLogger(OrderBySingleMapReduceTest.class);    public OrderBySingleMapReduceTest(long dataLength) throws Exception {    super(dataLength);    // TODO Auto-generated constructor stub  }  public OrderBySingleMapReduceTest(String outputPath) throws Exception {    super(outputPath);    // TODO Auto-generated constructor stub  }  public OrderBySingleMapReduceTest(long dataLength, String inputPath,      String outputPath) throws Exception {    super(dataLength, inputPath, outputPath);    // TODO Auto-generated constructor stub  }    private static class MyReducer     extends Reducer<Text,MultiColumnWritable,NullWritable,MultiColumnWritable>{    PriorityQueue<MultiColumnWritable> queue=new PriorityQueue<MultiColumnWritable>();    @Override    protected void reduce(Text key, Iterable<MultiColumnWritable> values,        Context context) throws IOException, InterruptedException {      MultiColumnWritable copy=null;      for(MultiColumnWritable value:values){        copy=MultiColumnWritable.copy(value);        queue.add(copy);      }            while(!queue.isEmpty()){        copy=queue.poll();        if(copy!=null){          context.write(NullWritable.get(), copy);        }      }    }      }    /**   * @param args   */  public static void main(String[] args) {    MyMapReduceTest mapReduceTest=null;    Configuration conf=null;    Job job=null;    FileSystem fs=null;    Path inputPath=null;    Path outputPath=null;    long begin=0;    String input="testDatas/mapreduce/MRInput_Single_OrderBy";    String output="testDatas/mapreduce/MROutput_Single_OrderBy";            try {      mapReduceTest=new OrderBySingleMapReduceTest(1000,input,output);            inputPath=new Path(mapReduceTest.getInputPath());      outputPath=new Path(mapReduceTest.getOutputPath());            conf=new Configuration();      job=new Job(conf,"OrderBy");            fs=FileSystem.getLocal(conf);      if(fs.exists(outputPath)){        if(!fs.delete(outputPath,true)){          System.err.println("Delete output file:"+mapReduceTest.getOutputPath()+" failed!");          return;        }      }                  job.setJarByClass(OrderBySingleMapReduceTest.class);      job.setMapOutputKeyClass(Text.class);      job.setMapOutputValueClass(MultiColumnWritable.class);      job.setOutputKeyClass(NullWritable.class);      job.setOutputValueClass(MultiColumnWritable.class);      job.setMapperClass(MultiSupMapper.class);      job.setReducerClass(MyReducer.class);            job.setNumReduceTasks(2);            FileInputFormat.addInputPath(job, inputPath);      FileOutputFormat.setOutputPath(job, outputPath);            begin=System.currentTimeMillis();      job.waitForCompletion(true);            System.out.println("===================================================");      if(mapReduceTest.isGenerateDatas()){        System.out.println("The maxValue is:"+mapReduceTest.getMaxValue());        System.out.println("The minValue is:"+mapReduceTest.getMinValue());      }      System.out.println("Spend time:"+(System.currentTimeMillis()-begin));      // Spend time:13361          } catch (Exception e) {      // TODO Auto-generated catch block      e.printStackTrace();    }      }}

热点排行