ChainMapper和ChainReducer处理数据流程示例
package com.oncedq.code;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.text.SimpleDateFormat;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextInputFormat;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapred.jobcontrol.Job;import org.apache.hadoop.mapred.jobcontrol.JobControl;import org.apache.hadoop.mapred.lib.ChainMapper;import com.oncedq.code.util.DateUtil;public class ProcessSample {public static class ExtractMappper extends MapReduceBase implementsMapper<LongWritable, Text, LongWritable, Conn1> {@Overridepublic void map(LongWritable arg0, Text arg1,OutputCollector<LongWritable, Conn1> arg2, Reporter arg3)throws IOException {String line = arg1.toString();String[] strs = line.split(";");Conn1 conn1 = new Conn1();conn1.orderKey = Long.parseLong(strs[0]);conn1.customer = Long.parseLong(strs[1]);conn1.state = strs[2];conn1.price = Double.parseDouble(strs[3]);conn1.orderDate = DateUtil.getDateFromString(strs[4], "yyyy-MM-dd");LongWritable lw = new LongWritable(conn1.orderKey);arg2.collect(lw, conn1);}}private static class Conn1 implements WritableComparable<Conn1> {public long orderKey;public long customer;public String state;public double price;public java.util.Date orderDate;@Overridepublic void readFields(DataInput in) throws IOException {orderKey = in.readLong();customer = in.readLong();state = Text.readString(in);price = in.readDouble();orderDate = DateUtil.getDateFromString(Text.readString(in),"yyyy-MM-dd");}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(orderKey);out.writeLong(customer);Text.writeString(out, state);out.writeDouble(price);Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));}@Overridepublic int compareTo(Conn1 arg0) {// TODO Auto-generated method stubreturn 0;}}public static class Filter1Mapper extends MapReduceBase implementsMapper<LongWritable, Conn1, LongWritable, Conn2> {@Overridepublic void map(LongWritable inKey, Conn1 c2,OutputCollector<LongWritable, Conn2> collector, Reporter report)throws IOException {if (c2.state.equals("F")) {Conn2 inValue = new Conn2();inValue.customer = c2.customer;inValue.orderDate = c2.orderDate;inValue.orderKey = c2.orderKey;inValue.price = c2.price;inValue.state = c2.state;collector.collect(inKey, inValue);}}}private static class Conn2 implements WritableComparable<Conn1> {public long orderKey;public long customer;public String state;public double price;public java.util.Date orderDate;@Overridepublic void readFields(DataInput in) throws IOException {orderKey = in.readLong();customer = in.readLong();state = Text.readString(in);price = in.readDouble();orderDate = DateUtil.getDateFromString(Text.readString(in),"yyyy-MM-dd");}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(orderKey);out.writeLong(customer);Text.writeString(out, state);out.writeDouble(price);Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));}@Overridepublic int compareTo(Conn1 arg0) {// TODO Auto-generated method stubreturn 0;}}public static class RegexMapper extends MapReduceBase implementsMapper<LongWritable, Conn2, LongWritable, Conn3> {@Overridepublic void map(LongWritable inKey, Conn2 c3,OutputCollector<LongWritable, Conn3> collector, Reporter report)throws IOException {c3.state = c3.state.replaceAll("F", "Find");Conn3 c2 = new Conn3();c2.customer = c3.customer;c2.orderDate = c3.orderDate;c2.orderKey = c3.orderKey;c2.price = c3.price;c2.state = c3.state;collector.collect(inKey, c2);}}private static class Conn3 implements WritableComparable<Conn1> {public long orderKey;public long customer;public String state;public double price;public java.util.Date orderDate;@Overridepublic void readFields(DataInput in) throws IOException {orderKey = in.readLong();customer = in.readLong();state = Text.readString(in);price = in.readDouble();orderDate = DateUtil.getDateFromString(Text.readString(in),"yyyy-MM-dd");}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(orderKey);out.writeLong(customer);Text.writeString(out, state);out.writeDouble(price);Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));}@Overridepublic int compareTo(Conn1 arg0) {// TODO Auto-generated method stubreturn 0;}}public static class LoadMapper extends MapReduceBase implementsMapper<LongWritable, Conn3, LongWritable, Conn3> {@Overridepublic void map(LongWritable arg0, Conn3 arg1,OutputCollector<LongWritable, Conn3> arg2, Reporter arg3)throws IOException {arg2.collect(arg0, arg1);}}public static void main(String[] args) {JobConf job = new JobConf(ProcessSample.class);job.setJobName("ProcessSample");job.setNumReduceTasks(0);job.setInputFormat(TextInputFormat.class);job.setOutputFormat(TextOutputFormat.class);JobConf mapper1 = new JobConf();JobConf mapper2 = new JobConf();JobConf mapper3 = new JobConf();JobConf mapper4 = new JobConf();ChainMapper cm = new ChainMapper();cm.addMapper(job, ExtractMappper.class, LongWritable.class, Text.class,LongWritable.class, Conn1.class, true, mapper1);cm.addMapper(job, Filter1Mapper.class, LongWritable.class, Conn1.class,LongWritable.class, Conn2.class, true, mapper2);cm.addMapper(job, RegexMapper.class, LongWritable.class, Conn2.class,LongWritable.class, Conn3.class, true, mapper3);cm.addMapper(job, LoadMapper.class, LongWritable.class, Conn3.class,LongWritable.class, Conn3.class, true, mapper4);FileInputFormat.setInputPaths(job, new Path("orderData"));FileOutputFormat.setOutputPath(job, new Path("orderDataOutput"));Job job1;try {job1 = new Job(job);JobControl jc = new JobControl("test");jc.addJob(job1);jc.run();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}?如果想了解程序的具体意思,探讨ChainMapper和ChainReducer在数据处理流程中的应用,请加我QQ:405078363 1 楼 Genie13 2012-05-17 很不错的例子啊