MapReduce之Join操作(2)
public abstract class TaggedMapOutput implements Writable { protected Text tag; public TaggedMapOutput() { this.tag = new Text(""); } public Text getTag() { return tag; } public void setTag(Text tag) { this.tag = tag; } public abstract Writable getData(); public TaggedMapOutput clone(JobConf job) { return (TaggedMapOutput) WritableUtils.clone(this, job); }}?
接下来,我们看看DataJoinMapperBase中的相关方法
protected abstract TaggedMapOutput generateTaggedMapOutput(Object value);protected abstract Text generateGroupKey(TaggedMapOutput aRecord);
以上两个方法需要由子类实现。上一篇文章提到,将两个表的连接键作为map输出的key值,其中第二个方法所做的就是这件事,生成一个类型为Text的key,不过这里是将它称作是GroupKey而已。因此map方法也就比较简单易懂了
public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { if (this.reporter == null) { this.reporter = reporter; } addLongValue("totalCount", 1); TaggedMapOutput aRecord = generateTaggedMapOutput(value); if (aRecord == null) { addLongValue("discardedCount", 1); return; } Text groupKey = generateGroupKey(aRecord); if (groupKey == null) { addLongValue("nullGroupKeyCount", 1); return; } output.collect(groupKey, aRecord); addLongValue("collectedCount", 1);}说完了map操作,接下来就是reduce阶段的事情了。参看DataJoinReducerBase这个类,其中的reduce方法主要部分是:
public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { if (this.reporter == null) { this.reporter = reporter; } SortedMap<Object, ResetableIterator> groups = regroup(key, values, reporter); Object[] tags = groups.keySet().toArray(); ResetableIterator[] groupValues = new ResetableIterator[tags.length]; for (int i = 0; i < tags.length; i++) { groupValues[i] = groups.get(tags[i]); } joinAndCollect(tags, groupValues, key, output, reporter); addLongValue("groupCount", 1); for (int i = 0; i < tags.length; i++) { groupValues[i].close(); }}protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);
public class DataJoin extends Confi gured implements Tool { public static class MapClass extends DataJoinMapperBase { protected Text generateInputTag(String inputFile) { String datasource = inputFile.split(“-”)[0]; return new Text(datasource); } protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); String[] tokens = line.split(“,”); String groupKey = tokens[0]; return new Text(groupKey); } protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } public static class Reduce extends DataJoinReducerBase { protected TaggedMapOutput combine(Object[] tags, Object[] values) { if (tags.length < 2) return null; String joinedStr = “”; for (int i=0; i<values.length; i++) { if (i > 0) joinedStr += “,”; TaggedWritable tw = (TaggedWritable) values[i]; String line = ((Text) tw.getData()).toString(); String[] tokens = line.split(“,”, 2); joinedStr += tokens[1]; } TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; } } public static class TaggedWritable extends TaggedMapOutput { private Writable data; public TaggedWritable(Writable data) { this.tag = new Text(“”); this.data = data; } public Writable getData() { return data; } public void write(DataOutput out) throws IOException { this.tag.write(out); this.data.write(out); } public void readFields(DataInput in) throws IOException { this.tag.readFields(in); this.data.readFields(in); } } public int run(String[] args) throws Exception {Confi guration conf = getConf();JobConf job = new JobConf(conf, DataJoin.class);Path in = new Path(args[0]);Path out = new Path(args[1]);FileInputFormat.setInputPaths(job, in);FileOutputFormat.setOutputPath(job, out);job.setJobName(“DataJoin”);job.setMapperClass(MapClass.class);job.setReducerClass(Reduce.class);job.setInputFormat(TextInputFormat.class);job.setOutputFormat(TextOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(TaggedWritable.class);job.set(“mapred.textoutputformat.separator”, “,”);JobClient.runJob(job);return 0;} public static void main(String[] args) throws Exception {int res = ToolRunner.run(new Confi guration(),new DataJoin(),args);System.exit(res);}} ?
?
?
?