首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

Hadoop应用系列2-MapReduce原理浅析(上)

2013-02-04 
Hadoop应用系列2--MapReduce原理浅析(下)上面我们分析气温的那段程序,看起没有问题,用起来也没有问题。试想

Hadoop应用系列2--MapReduce原理浅析(下)
上面我们分析气温的那段程序,看起没有问题,用起来也没有问题。

试想一下,如果我们把全球所有气象站的数据拿来分析, 你的程序大约需要多久能计算出结果?或者说能否完成运算?
以前我们会把来自不同气象站的数据在不同时间,或者不同计算机上进行运算,最后把结果拿来,再次执行运算。

有了MapReduce咱就不用这么麻烦了,MapReduce做了这样几件事:
0、分布式并行
1、他把输入和输出分开。Mapper负责读取数据,把需要计算的数据输出给Reducer,
也就是说,我们刚才写的程序addYearAndTemperature和out他们是天然的2个程序。
2、本身就是为分布式而设计的,他会把来自多个输入(map)的结果自动的合并并输出(reduce)
3、总是读取本地数据进行运算(相当于我们让不同的计算机分析来自不同气象站的数据)

不知道我说清楚了没有,如果有不清楚的地方欢迎讨论和拍砖。

下面我们来使用hadoop来编写MapReduce程序,来完成同样的功能,注意Mapper是Reducer的输入哦。

我们先来编写Mapper吧,就是把年份和气温取出的程序。

package org.pcwl.mapper;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;/** * <p><b>用于读取气象信息,从气象信息中分解出年份和气温的程序。</b></p> * <p> * MapReduce的mapper需要继承Mapper<T1,T2,T3,T4>类 * 重写map(T1,T2,T3,T4)方法 * </p> * <p> * T1表示mapper的输入key类型,T2表示mapper输入的value类型,我们可以把他理解为 * <em>hadoop的文件读取其,把数据读进来之后装入到一个java.util.Map<K,V>中, * 我们从这个java.util.Map<K,V>中读取数据, 这里T1就是这个K的类型.T2就是V的类型</em> * <i> * 注意:输入到mapper中的数据是以行为单位的(\n) 你懂的。 *  value是当前行,key可以看作是行号 * </i> * </p> * <p> * T3,T4表示mapper的输出类型,也是Reducer的输入类型,我们可以理解为<em> *  mapper程序把需要reducer计算的数据写入一个java.util.Map<K,V>中, *  T3,表示这个K的类型,T4表示这个V的类型。 * </em> * </p> * <p> * <strong> * 注意:这里类型不能使用java自身的类型, *  我们使用<i>org.apache.hadoop.io</i>这个包下的类型 *  ,他们是为分布式并行而设计的类型 *  LongWritable---视为java.lang.Long的替代类型 *  Text---视为java.lang.String的替代类型 *  IntWritable---视为java.lang.Integer的替代类型 *  </strong> * </p> * @author project_maker@sina.com;eastzhang.iteye.com * */public class Temperature  extends Mapper<LongWritable, Text, Text, IntWritable> {/* 已知的错误数据值 */private static final int MISSING = 9999;@Overridepublic void map(LongWritable key, Text value, Context context)      throws IOException, InterruptedException {    /* 从value取出值,得到String */    String line = value.toString();    /* 从字符串中提出年份信息 */    String year = line.substring(15, 19);    int temperature;    if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs      temperature = Integer.parseInt(line.substring(88, 92));    } else {      temperature = Integer.parseInt(line.substring(87, 92));    }    if (temperature != MISSING) {      /* 把合法数据送入到reducer */      context.write(new Text(year), new IntWritable(temperature));    }}}


到此我的mapper编写完成,记住它相当于 addYearAndTemperature 这个哦。

接下来我们写 Reducer , 它想当于out

package org.pcwl.reducer;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** * <p>Reducer 程序需要继承<code>org.apache.hadoop.mapreduce.Reducer<T1,T2,T3,T4></code> * 重写reduce<T1,Iterable<T2>,T3,T4>方法。 * <p> * <p> * 这里T1,T2 是 Reducer的输入类型,也是Mapper的输出类型, *  <b>注意必须一致哟.<b> *  T3,T4是Reducer的输出类型,也就是写入文件中数据类型。 *  根据你程序要写入文件的结果确定类型吧。 * </p> * @author project_maker@sina.com;eastzhang.iteye.com * */public class Temperature  extends Reducer<Text, IntWritable, Text, IntWritable> {    @Override  /**   * <strong>注意这里values是个迭代器</strong>也就是说,Reducer是对一个集合进行处理,   * 这一点在@see http://eastzhang.iteye.com/blog/1775734有体现   */  public void reduce(Text key, Iterable<IntWritable> values,      Context context)      throws IOException, InterruptedException {    /* 要计算最高气温,我们来个无穷小,作为比较量 */    int maxValue = Integer.MIN_VALUE;        for (IntWritable value : values) {      maxValue = Math.max(maxValue, value.get());    }    /* key 是 年份, 我们把结果写入到文件中 */    context.write(key, new IntWritable(maxValue));  }}

到此, 我们Reducer程序编写完成。

让MapReduce程序运行, 我们需要些调度器(job)作业。
package org.pcwl.job;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 运行MapReduce的程序(job)作业* @Author project_maker@sina.com; eastzhang@iteye.com*/public class Temperature {  public static void main(String[] args) throws Exception {    if (args.length != 2) {      System.err.println("请输入数据源文件夹<以存在>和目标文件夹<不存在>");      System.exit(-1);    }        Job job = new Job();    job.setJarByClass(Temperature.class);    job.setJobName("计算最高气温");    /* 指定从何出读取数据 */    FileInputFormat.addInputPath(job, new Path(args[0]));    /* 指定结果写入何处,注意文件夹必须不存在 */    FileOutputFormat.setOutputPath(job, new Path(args[1]));    /* 设置Mapper程序 */    job.setMapperClass(org.pcwl.mapper.Temperature.class);    /* 设置Reducer程序 */    job.setReducerClass(org.pcwl.reducer.Temperature.class);        /* 指定输出结果类型 */    job.setOutputKeyClass(Text.class);    job.setOutputValueClass(IntWritable.class);        System.exit(job.waitForCompletion(true) ? 0 : 1);  }}

到此我们真个MapReduce就搞定了, 打jar包运行它









下一篇我们将要研究一下Combiner


1 楼 Tracy-Lcz 昨天   顶你一个老兄

热点排行