首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 操作系统 >

mahout源码分析之DistributedLanczosSolver(7)总结篇

2014-06-23 
mahout源码分析之DistributedLanczosSolver(七)总结篇Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bi

mahout源码分析之DistributedLanczosSolver(七)总结篇

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit。

看svd算法官网上面使用的是亚马逊的云平台计算的,不过给出了svd算法的调用方式,当算出了eigenVectors后,应该怎么做呢?比如原始数据是600*60(600行,60列)的数据,计算得到的eigenVectors是24*60(其中的24是不大于rank的一个值),那么最后得到的结果应该是original_data乘以eigenVectors的转置这样就会得到一个600*24的矩阵,这样就达到了降维的目的。

本篇介绍一个可以直接使用svd工具类,可以在http://download.csdn.net/detail/fansy1990/6479451下载;

下载后一共有三个文件,其中一个是synthetic_control.data数据文件,一个svd.jar文件,一个crunch-0.5.0-incubating.jar文件(要放在云平台的lib下面);

运行方式:1)把crunch-0.5.0-incubating.jar放在hadoop 的/lib下面,然后重启集群;

2) 上传synthetic_control.data文件到HDFS;

3)运行svd.jar,参考下面的指令:

package mahout.fansy.svd;import java.io.IOException;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Map.Entry;import mahout.fansy.utils.read.ReadArbiKV;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.util.ToolRunner;import org.apache.mahout.common.AbstractJob;import org.apache.mahout.common.HadoopUtil;import org.apache.mahout.math.DenseVector;import org.apache.mahout.math.Vector;import org.apache.mahout.math.VectorWritable;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.google.common.collect.Lists;/** * Dimension Reduction<br> *  * the last job to transform the input to the right one * @author fansy * */public class SvdReductionTranform extends AbstractJob {private final static String EIGENPATH="/eigenPath";private final static String VECTORCOLUMN="vectorColumn";private static final Logger log = LoggerFactory.getLogger(SvdReductionTranform.class);@Overridepublic int run(String[] args) throws Exception {addInputOption();    addOutputOption();    addOption("numCols", "nc", "Number of columns of the input matrix");    addOption("eigenPath","e","eigen vectors path");        if (parseArguments(args) == null) {        return -1;      }    Path input=getInputPath();    Path output=getOutputPath();    String eigenPath=getOption("eigenPath");    String column=getOption("numCols");        Configuration conf=new Configuration(getConf() != null ? getConf() : new Configuration());    conf.set(EIGENPATH, eigenPath);    try{    int col=Integer.parseInt(column);    conf.setInt(VECTORCOLUMN, col);    }catch(Exception e){    return -2;  // format exception:-2    }        log.info("delete file "+output);    HadoopUtil.delete(conf, output);  // delete output        Job job=new Job(conf,"prepare svd vector from "+input.toUri());    job.setJarByClass(SvdReductionTranform.class);    job.setInputFormatClass(SequenceFileInputFormat.class);    job.setOutputFormatClass(SequenceFileOutputFormat.class);        job.setOutputKeyClass(NullWritable.class);    job.setOutputValueClass(VectorWritable.class);        SequenceFileInputFormat.addInputPath(job, input);    SequenceFileOutputFormat.setOutputPath(job, output);        job.setMapperClass(TransMapper.class);    job.setNumReduceTasks(0);        boolean succeeded = job.waitForCompletion(true);    if (!succeeded) {      throw new IllegalStateException("Job failed!");    }return 0;}public static class TransMapper extends Mapper<LongWritable,VectorWritable,NullWritable,VectorWritable>{List<Vector> list=Lists.newArrayList();int column;int transCol;@Overridepublic void setup(Context cxt) throws IOException{log.info("in the first row in setup()");column=cxt.getConfiguration().getInt(VECTORCOLUMN, -1);String eigenPath=cxt.getConfiguration().get(EIGENPATH);log.info("eigenPath:"+eigenPath);log.info("cxt.getConfiguration().get(\"mapred.job.tracker\")"+cxt.getConfiguration().get("mapred.job.tracker"));Map<Writable,Writable> eigenMap=null;try {eigenMap=ReadArbiKV.readFromFile(eigenPath,cxt.getConfiguration().get("mapred.job.tracker"));} catch (Exception e) {log.info("读取不到数据?");//e.printStackTrace();}Iterator<Entry<Writable, Writable>> eigenIter=eigenMap.entrySet().iterator();// initial eigen vectorswhile(eigenIter.hasNext()){Map.Entry<Writable, Writable> set=eigenIter.next();VectorWritable eigenV=(VectorWritable) set.getValue();if(eigenV.get().size()==column){list.add(eigenV.get());}}log.info("the last row in setup()"+list.size());transCol=list.size();} @Override public void map(LongWritable key,VectorWritable value,Context cxt) throws IOException, InterruptedException{ Vector transVector=new DenseVector(transCol); for(int i=0;i<transCol;i++){ double d=value.get().dot(list.get(i));  //  dot multiply transVector.setQuick(i, d); } VectorWritable vector=new VectorWritable(transVector); cxt.write(NullWritable.get(), vector); }}public static void main(String[] args) throws Exception{ToolRunner.run(new Configuration(), new SvdReductionTranform(), args);}}


最后,总结一点就是rank的值,应该可以设置为小于原始数据列数且接近这个值,这样就应该会得到比较好的结果。因为最后得到的eigenVectors的行数就是最后降维的列数,而eigenVectors的行数是一个不大于rank的整型值,所以说rank的值要设置好才行;


分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990



热点排行