首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

地图 reduce 任务串联

2013-07-20 
map reduce 任务串联import com.vividsolutions.jts.io.WKTReaderimport org.apache.hadoop.conf.Configu

map reduce 任务串联

import com.vividsolutions.jts.io.WKTReader;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.text.SimpleDateFormat;public class DependencyJob {    static class FirstMapper extends Mapper<LongWritable, Text, Text, LongWritable> {        @Override        public void map(LongWritable key, Text value, Context context) {            String[] a = value.toString().split("\t");            String time = a[1];            try {                context.write(new Text(time), new LongWritable(1L));            } catch (IOException e) {                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.            } catch (InterruptedException e) {                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.            }        }    }    static class FirstReducer extends Reducer<Text, LongWritable, Text, LongWritable> {        @Override        public void reduce(Text key, Iterable<LongWritable> values, Context context) {            long a = 0;            for(LongWritable l : values){                a += l.get();            }            try {                context.write(new Text(key), new LongWritable(a));            } catch (IOException e) {                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.            } catch (InterruptedException e) {                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.            }        }    }    static class SecondMapper extends Mapper<LongWritable, Text, Text, LongWritable> {        @Override        public void map(LongWritable key, Text value, Context context) {            String[] a = value.toString().split("\t");            String time = a[0];            Long l = time == null? 0: Long.parseLong(time)%1000;            try {                context.write(new Text(l.toString()), new LongWritable(1L));            } catch (IOException e) {                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.            } catch (InterruptedException e) {                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.            }        }    }    static class SecondReducer extends Reducer<Text, LongWritable, Text, LongWritable> {        @Override        public void reduce(Text key, Iterable<LongWritable> values, Context context) {            long a = 0;            for(LongWritable l : values){                a += l.get();            }            try {                context.write(new Text(key), new LongWritable(a));            } catch (IOException e) {                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.            } catch (InterruptedException e) {                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.            }        }    }    public static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");    public static long stime, etime;    public static WKTReader rd = new WKTReader();    public static void main(String[] args) throws Exception {        if (args.length != 3) {            System.out.println("lack arg");            System.exit(0);        }        Configuration configuration = new Configuration();        JobControl jobControl = new JobControl("dependency");        Job job = new Job();        job.setJobName("first");        job.setJarByClass(DependencyJob.class);        FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        job.setMapperClass(FirstMapper.class);        job.setReducerClass(FirstReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(LongWritable.class);        job.setNumReduceTasks(8);//        ControlledJob controlledJob = new ControlledJob(configuration);        controlledJob.setJob(job);        Job job2 = new Job();        job2.setJobName("second");        job2.setJarByClass(DependencyJob.class);        FileInputFormat.addInputPath(job2, new Path(args[1]));        FileOutputFormat.setOutputPath(job2, new Path(args[2]));        job2.setMapperClass(SecondMapper.class);        job2.setReducerClass(SecondReducer.class);        job2.setOutputKeyClass(Text.class);        job2.setOutputValueClass(LongWritable.class);        job.setNumReduceTasks(8);//        ControlledJob controlledJob2 = new ControlledJob(configuration);        controlledJob2.setJob(job2);        controlledJob2.addDependingJob(controlledJob);        jobControl.addJob(controlledJob);        jobControl.addJob(controlledJob2);        jobControl.run();    }}

热点排行