首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 数据库 > 其他数据库 >

MapReduce之Join操作(二)

2012-07-27 
MapReduce之Join操作(2)public abstract class TaggedMapOutput implements Writable {protected Text tag

MapReduce之Join操作(2)

public abstract class TaggedMapOutput implements Writable { protected Text tag; public TaggedMapOutput() { this.tag = new Text(""); } public Text getTag() { return tag; } public void setTag(Text tag) { this.tag = tag; } public abstract Writable getData(); public TaggedMapOutput clone(JobConf job) { return (TaggedMapOutput) WritableUtils.clone(this, job); }}?

接下来,我们看看DataJoinMapperBase中的相关方法

protected abstract TaggedMapOutput generateTaggedMapOutput(Object value);protected abstract Text generateGroupKey(TaggedMapOutput aRecord);

以上两个方法需要由子类实现。上一篇文章提到,将两个表的连接键作为map输出的key值,其中第二个方法所做的就是这件事,生成一个类型为Text的key,不过这里是将它称作是GroupKey而已。因此map方法也就比较简单易懂了

public void map(Object key, Object value, OutputCollector output,                          Reporter reporter) throws IOException {    if (this.reporter == null) {        this.reporter = reporter;    }    addLongValue("totalCount", 1);    TaggedMapOutput aRecord = generateTaggedMapOutput(value);    if (aRecord == null) {        addLongValue("discardedCount", 1);        return;    }    Text groupKey = generateGroupKey(aRecord);    if (groupKey == null) {        addLongValue("nullGroupKeyCount", 1);        return;    }    output.collect(groupKey, aRecord);    addLongValue("collectedCount", 1);}

说完了map操作,接下来就是reduce阶段的事情了。参看DataJoinReducerBase这个类,其中的reduce方法主要部分是:

public void reduce(Object key, Iterator values,                              OutputCollector output, Reporter reporter) throws IOException {    if (this.reporter == null) {        this.reporter = reporter;    }    SortedMap<Object, ResetableIterator> groups = regroup(key, values, reporter);     Object[] tags = groups.keySet().toArray();    ResetableIterator[] groupValues = new ResetableIterator[tags.length];    for (int i = 0; i < tags.length; i++) {        groupValues[i] = groups.get(tags[i]);    }    joinAndCollect(tags, groupValues, key, output, reporter);    addLongValue("groupCount", 1);    for (int i = 0; i < tags.length; i++) {        groupValues[i].close();    }}

protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);

public class DataJoin extends Confi gured implements Tool { public static class MapClass extends DataJoinMapperBase { protected Text generateInputTag(String inputFile) { String datasource = inputFile.split(“-”)[0]; return new Text(datasource); } protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); String[] tokens = line.split(“,”); String groupKey = tokens[0]; return new Text(groupKey); } protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } public static class Reduce extends DataJoinReducerBase { protected TaggedMapOutput combine(Object[] tags, Object[] values) { if (tags.length < 2) return null; String joinedStr = “”; for (int i=0; i<values.length; i++) { if (i > 0) joinedStr += “,”; TaggedWritable tw = (TaggedWritable) values[i]; String line = ((Text) tw.getData()).toString(); String[] tokens = line.split(“,”, 2); joinedStr += tokens[1]; } TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; } } public static class TaggedWritable extends TaggedMapOutput { private Writable data; public TaggedWritable(Writable data) { this.tag = new Text(“”); this.data = data; } public Writable getData() { return data; } public void write(DataOutput out) throws IOException { this.tag.write(out); this.data.write(out); } public void readFields(DataInput in) throws IOException { this.tag.readFields(in); this.data.readFields(in); } } public int run(String[] args) throws Exception {Confi guration conf = getConf();JobConf job = new JobConf(conf, DataJoin.class);Path in = new Path(args[0]);Path out = new Path(args[1]);FileInputFormat.setInputPaths(job, in);FileOutputFormat.setOutputPath(job, out);job.setJobName(“DataJoin”);job.setMapperClass(MapClass.class);job.setReducerClass(Reduce.class);job.setInputFormat(TextInputFormat.class);job.setOutputFormat(TextOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(TaggedWritable.class);job.set(“mapred.textoutputformat.separator”, “,”);JobClient.runJob(job);return 0;} public static void main(String[] args) throws Exception {int res = ToolRunner.run(new Confi guration(),new DataJoin(),args);System.exit(res);}} ?

?

?

?

热点排行