首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

MapReduce : Combiner的使用(以平均数为例) 并结合in-地图per design pattern 实例

2012-08-26 
MapReduce : Combiner的使用(以平均数为例) 并结合in-mapper design pattern 实例?没有使用Combiner 和 in

MapReduce : Combiner的使用(以平均数为例) 并结合in-mapper design pattern 实例

?

没有使用Combiner 和 in-mapper desgin patternimport java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.Reducer.Context;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class digitaver1 {public static class mapper extends Mapper<LongWritable, Text, Text, IntWritable>{@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String[] ss = value.toString().split(":");context.write(new Text(ss[0]), new IntWritable(Integer.parseInt(ss[1])));}}public static class reducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> value,Context context) throws IOException, InterruptedException {int sum = 0;int cnt = 0;while(value.iterator().hasNext()){sum += value.iterator().next().get();cnt+=1;}context.write(key, new DoubleWritable((double)sum/(double)cnt));}}public static void main(String[] args) {try {Job job = new Job();job.setJarByClass(digitaver1.class);job.setJobName("digitaver1");FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setMapperClass(mapper.class);job.setReducerClass(reducer.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);System.exit( job.waitForCompletion(true) ? 0 : 1 );} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}使用Combinerpublic static class mapper extends Mapper<LongWritable, Text, Text, pair>{@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String[] ss = value.toString().split(":");pair p = new pair(Integer.parseInt(ss[1]), 1);context.write(new Text(ss[0]), p);}}public static class combiner extends Reducer<Text, pair, Text, pair>{@Overrideprotected void reduce(Text key, Iterable<pair> value,Context context)throws IOException, InterruptedException {int sum = 0;int cnt = 0;while(value.iterator().hasNext()){pair p = value.iterator().next();sum += p.getLeft().get();cnt += p.getRight().get();}context.write(key, new pair(sum,cnt));}}public static class reducer extends Reducer<Text, pair, Text, DoubleWritable>{@Overrideprotected void reduce(Text key, Iterable<pair> value,Context context) throws IOException, InterruptedException {int sum = 0;int cnt = 0;while(value.iterator().hasNext()){pair p = value.iterator().next();sum += p.getLeft().get();cnt += p.getRight().get();}context.write(key, new DoubleWritable((double)sum/(double)cnt));}}main函数都一样使用in-mapper design patternpublic static class mapper extends Mapper<LongWritable, Text, Text, pair>{private Map<String,String> map ;@Overrideprotected void setup(Context context) throws IOException,InterruptedException {// TODO Auto-generated method stubmap = new HashMap<String, String>();}//处理完所有的输入文件再一起传给reducer或者combiner//以前map在执行过程中会一边执行一边讲输出的部分结构先传输给reducer  按照上面的话  效率会不会受影响?//虽然数据少了,但是开始的时间也推迟了??堵塞延迟小了??//负载平衡??网络中总的数据量少了??@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String[] ss = value.toString().split(":");if(!map.containsKey(ss[0])){map.put(ss[0], ss[1]+":"+1);}else{String tmp = map.get(ss[0]);String[] tt = tmp.split(":");int ta = Integer.parseInt(ss[1])+Integer.parseInt(tt[0]);int tb = Integer.parseInt(tt[1])+1;map.put(ss[0], ta+":"+tb);}}@Overrideprotected void cleanup(Context context) throws IOException,InterruptedException {for(Map.Entry<String, String> e : map.entrySet()){String[] tt = e.getValue().split(":");pair p = new pair(Integer.parseInt(tt[0]), Integer.parseInt(tt[1]));context.write(new Text(e.getKey()), p);}}}public static class reducer extends Reducer<Text, pair, Text, DoubleWritable>{@Overrideprotected void reduce(Text key, Iterable<pair> value,Context context) throws IOException, InterruptedException {int sum = 0;int cnt = 0;while(value.iterator().hasNext()){pair p = value.iterator().next();sum += p.getLeft().get();cnt += p.getRight().get();}context.write(key, new DoubleWritable((double)sum/(double)cnt));}}
in-mapper design pattern:单个mapper结果进行聚集
Combiner:所有的mapper结果进行聚集

热点排行