MapReduce高级编程——自定义InputFormat——深入理解
0、本文承接上文?MapReduce高级编程——自定义InputFormat
import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;public class Point3DMapper extends Mapper<Text, Point3D, Text, Point3D>{protected void map(Text key, Point3D value, Context context) throws IOException, InterruptedException{context.write(key, value);}} ? ? ?进入Mapper源代码可以看到 ? ? 即,/** ? 而,// Text -> ball etc. -> KEYOUT ? 同理,我们查看Reducer源代码,可以看到,Mapper的KEYOUT类型和VALUEOUT类型,必须对应Reducer的KEYIN类型和VLUEIN类型。 ? ? ? 本文的目的是因为有些时候文档不是很清楚,而最好的方法是看源代码。“源码之前,了无秘密”。 ? 对于MapReduce的细致执行流程,我推荐看例子经典,博客作者一流的Map Reduce – the Free Lunch is not over?。相信看完会有所收获! ? ?public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public class Context extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { public Context(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN,VALUEIN> reader, RecordWriter<KEYOUT,VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) throws IOException, InterruptedException { super(conf, taskid, reader, writer, committer, reporter, split); } } /** * Called once at the beginning of the task. */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function. */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); }
?* Text -> KEYIN* Point3D -> VALUEIN* Text -> KEYOUT* Point3D -> VALUEOUT**/Mapper<Text, Point3D, Text, Point3D>
// value -> .5, 12.7, 9.0 etc. -> VALUEOUTcontext.write(key, value);
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { public class Context extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { public Context(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter<KEYOUT,VALUEOUT> output, OutputCommitter committer, StatusReporter reporter, RawComparator<KEYIN> comparator, Class<KEYIN> keyClass, Class<VALUEIN> valueClass ) throws IOException, InterruptedException { super(conf, taskid, input, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); } } /** * Called once at the start of the task. */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * This method is called once for each key. Most applications will define * their reduce class by overriding this method. The default implementation * is an identity function. */ @SuppressWarnings("unchecked") protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context ) throws IOException, InterruptedException { for(VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); } }