首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

MapReduce高级编程之当地聚集与Combinner

2012-06-30 
MapReduce高级编程之本地聚集与Combinner1、Map/Reduce编程模型型原理利用一个输入key/value pair 集合来产

MapReduce高级编程之本地聚集与Combinner

1、Map/Reduce编程模型型原理

利用一个输入key/value pair 集合来产生一个输出的key/value pair 集合。MapReduce
库的用户用两个函数表达这个计算:Map 和Reduce。

Hadoop Map/Reduce实现主要是通过继承Mapper和Reducer两个抽象类,并实现map和reduce两个方法实现的。

?

Mapper

?

Mapper 将输入键值对(key/value pair)映射到一组中间格式的键值对集合。Map是一类将输入记录集转换为中间格式记录集的独立任务。 这种转换的中间格式记录集不需要与输入记录集的类型一致。一个给定的输入键值对可以映射成0个或多个输出键值对。输出键值对不需要与输入键值对的类型一致。一个给定的输入键值对可以映射成0个或多个输出键值对。Mapper 的输出被排序后,就被划分给每个Reducer 。分块的总数目和一个作业的reduce任务的数目是一样的。用户可以通过实现自定义的 Partitioner 来控制哪个key被分配给哪个 Reducer 。用户可选择通过 JobConf.setCombinerClass(Class) 指定一个combiner ,它负责对中间过程的输出进行本地的聚集,这会有助于降低从Mapper 到 Reducer 数据传输量。这些被排好序的中间过程的输出结果保存的格式是(key-len, key, value-len, value),应用程序可以通过JobConf 控制对这些中间结果是否进行压缩以及怎么压缩,使用哪种 CompressionCodec 。
需要多少个Map?Map的数目通常是由输入数据的大小决定的,一般就是所有输入文件的总块(block)数。Map正常的并行规模大致是每个节点(node)大约10到100个map,对于CPU 消耗较小的map任务可以设到300个左右。由于每个任务初始化需要一定的时间,因此,比较合理的情况是map执行的时间至少超过1分钟。这样,如果你输入10TB的数据,每个块(block)的大小是128MB,你将需要大约82,000个map来完成任务,除非使用 setNumMapTasks(int) (注意:这里仅仅是对框架进行了一个提示(hint),实际决定因素见这里 )将这个数值设置得更高。
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class NumberSum {//对每一行数据进行分隔,并求和public static class SumMapper extendsMapper<Object, Text, Text, LongWritable> {private Text word = new Text("sum");private static LongWritable numValue = new LongWritable(1);public void map(Object key, Text value, Context context)throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());long sum = 0;while (itr.hasMoreTokens()) {String s = itr.nextToken();long val = Long.parseLong(s);sum += val;}numValue.set(sum);context.write(word, numValue);}}// 汇总求和,输出public static class SumReducer extendsReducer<Text, LongWritable, Text, LongWritable> {private LongWritable result = new LongWritable();private Text k = new Text("sum");public void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {long sum = 0;for (LongWritable val : values) {long v = val.get();sum += v;}result.set(sum);context.write(k, result);}}/** * @param args * @throws Exception */public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: numbersum <in> <out>");System.exit(2);}Job job = new Job(conf, "number sum"); job.setJarByClass(NumberSum.class);job.setMapperClass(SumMapper.class);job.setReducerClass(SumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);System.out.println("ok");}}

?

2. 第二种思路实现

import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class NumberSum {//对每一个数字进行分隔public static class NumSumMapper extendsMapper<Object, Text, Text, LongWritable> {private Text word = new Text();private static LongWritable numValue = new LongWritable(1);public void map(Object key, Text value, Context context)throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());long sum = 0;while (itr.hasMoreTokens()) {String s = itr.nextToken();word.set(s);context.write(word, numValue);}}}//对每一个数字进行汇总求和public static class SumCombiner extendsReducer<Text, LongWritable, Text, LongWritable> {private LongWritable result = new LongWritable();private Text k = new Text("midsum");public void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {long sum = 0;if (!key.toString().startsWith("midsum")) {for (LongWritable val : values) {sum += val.get();}long kval = Long.parseLong(key.toString());long v = kval * sum;result.set(v);context.write(k, result);} else {for (LongWritable val : values) {context.write(key, val);}}}}// 汇总求和,输出public static class SumReducer extendsReducer<Text, LongWritable, Text, LongWritable> {private LongWritable result = new LongWritable();private Text k = new Text("sum");public void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {long sum = 0;for (LongWritable val : values) {long v = val.get();sum += v;}result.set(sum);context.write(k, result);}}/** * @param args * @throws Exception */public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: numbersum <in> <out>");System.exit(2);}Job job = new Job(conf, "number sum"); job.setJarByClass(NumberSum.class);job.setMapperClass(NumSumMapper.class);job.setCombinerClass(SumCombiner.class);job.setReducerClass(SumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);System.out.println("ok");}}

?

参考:

[1]Hadoop Map/Reduce编程模型实现海量数据处理—数字求和 魏仁言

[2]MapReduce高级编程之本地聚集与Combinner Fth-Hokage