首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

hadoop-地图reduce代码之多表关联

2012-10-19 
hadoop--mapreduce代码之多表关联package com.hadoop.sampleimport java.io.IOExceptionimport java.uti

hadoop--mapreduce代码之多表关联

package com.hadoop.sample;import java.io.IOException;import java.util.Iterator;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class MTJoin {private static int time = 0;public static class Map extends Mapper<Object,Text,Text,Text>{//在map中先区分输入行属于左表还是右表,然后对两列值进行分割,//保存连接列在key值,剩余列和左右表标志在value中,最后输出public void map(Object key,Text value,Context context) throws IOException,InterruptedException{String line = value.toString();int i = 0;//输入文件首行,不处理if(line.contains("factoryname")==true||line.contains("addressID")==true){return;}//找出数据中的分割点while(line.charAt(i)>='9'||line.charAt(i)<='0'){i++;}if(line.charAt(i)>='9'||line.charAt(i)<='0'){//左表int j = i-1;while(line.charAt(j)!=' ') j--;String[] values = {line.substring(0, j),line.substring(i)};context.write(new Text(values[1]), new Text("1+"+values[0]));}else{//右表int j = i+1;while(line.charAt(j)!=' ') j++;String[] values = {line.substring(0, i+1),line.substring(j)};context.write(new Text(values[0]), new Text("2+"+values[1]));}}}public static class Reduce extends Reducer<Text,Text,Text,Text>{//reduce解析map输出,将value中数据按照左右表分别保存,然后求笛卡尔积,输出public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{if(time == 0){//输入文件第一行context.write(new Text("factoryname"),new Text("addressname"));time++;}int factorynum = 0;String factory[] = new String[10];int adressnum = 0;String adress[] = new String[10];Iterator iter = values.iterator();while(iter.hasNext()){String record = iter.next().toString();int len = record.length();int i = 2;char type = record.charAt(0);String factoryname = new String();String adressname = new String();if(type == '1'){//左表factory[factorynum] = record.substring(2);factorynum++;}else{//右表adress[adressnum] = record.substring(2);}}if(factorynum!=0&&adressnum!=0){//笛卡尔积for(int m=0;m<factorynum;m++){for(int n=0;n<adressnum;n++){context.write(new Text(factory[m]), new Text(adress[n]));}}}}}/** * @param args */public static void main(String[] args) throws Exception{// TODO Auto-generated method stubConfiguration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length != 2){System.err.println("Usage WordCount <int> <out>");System.exit(2);}Job job = new Job(conf,"word count");job.setJarByClass(MTJoin.class);job.setMapperClass(Map.class);job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

热点排行