首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

hadoop新版本多资料输出

2013-04-20 
hadoop新版本多文件输出基于Hadoop 1.0.1版本编写,在hadoop 1.0.1 + 版本应该都可以使用import java.io.IO

hadoop新版本多文件输出

基于Hadoop 1.0.1版本编写,在hadoop 1.0.1 + 版本应该都可以使用

import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MultipleOutputFile{    public static class TokenizerMapper extends            Mapper<LongWritable, Text, Text, IntWritable>    {        private final static IntWritable one = new IntWritable(1);        private Text word = new Text();        public void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException        {            StringTokenizer itr = new StringTokenizer(value.toString());            while (itr.hasMoreTokens())            {                word.set(itr.nextToken());                context.write(word, one);            }        }    }    public static class IntSumReducer extends            Reducer<Text, IntWritable, Text, IntWritable>    {        private MultipleOutputs<Text, IntWritable> mos;        @Override        protected void setup(Context context) throws IOException,                InterruptedException        {            mos = new MultipleOutputs<Text, IntWritable>(context);        }        private IntWritable result = new IntWritable();        public void reduce(Text key, Iterable<IntWritable> values,                Context context) throws IOException, InterruptedException        {            int sum = 0;            for (IntWritable val : values)            {                sum += val.get();            }            result.set(sum);            mos.write("text", key, new IntWritable(1));            mos.write("seq", key, result);            context.write(key, result);        }        @Override        protected void cleanup(Context context)                throws IOException, InterruptedException        {            // 流操作必须有,否则在数据量小的情况,数据全部停留在缓冲区中            mos.close();        }    }    public static void main(String[] args) throws IOException,            InterruptedException, ClassNotFoundException    {        Configuration conf = new Configuration();        Job job = new Job(conf, "word count");                FileInputFormat.addInputPaths(job, args[0]);        FileOutputFormat.setOutputPath(job, new Path(args[1]));                job.setJarByClass(MultipleOutputFile.class);        job.setMapperClass(TokenizerMapper.class);        job.setReducerClass(IntSumReducer.class);                job.setOutputFormatClass(TextOutputFormat.class);                MultipleOutputs.addNamedOutput(job, "text",                TextOutputFormat.class,                Text.class, IntWritable.class);                MultipleOutputs.addNamedOutput(job, "seq",                TextOutputFormat.class,                Text.class, IntWritable.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        System.exit(job.waitForCompletion(true) ? 0 : 1);    }}

?

热点排行