MapReduce框架中PageRank算法的简单实现
主要实现思想在另一篇博客中已经提到:
?
具体实现每次迭代包括两个Job
第一个分散各个节点的PR值
?
第二个用于将dangling节点的PR值分散到其它节点
?
主要包括5个类
PageRankNode:图中的节点类-代表一个页面
PageRankJob:实现分散各个节点的PR值的类
DistributionPRMass:实现dangling节点的PR值分散到其它节点的Job类
RangePartitioner:partition类? 将连续的节点分配到同一个reduce中
PageRankDirver:整个工作的驱动类(主函数)
?
package com.zxx.PageRank;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;public class PageRankJob{public static final double d = 0.85;private static final double nodecount = 10;private static final double threshold=0.01;//收敛邻接点public static enum MidNodes{ // 记录已经收敛的个数Map, Reduce};public static class PageRankMaper extends Mapper<Object, Text, Text, Text>{@Overridepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException{PageRankNode node = PageRankNode.InstanceFormString(value.toString());node.setOldPR(node.getNewPR());context.write(new Text(node.getId()), new Text(PageRankNode.toStringWithOutID(node)));for (String str : node.getDestNodes()){String outPR = new Double(node.getNewPR() / (double)node.getNumDest()).toString();context.write(new Text(str), new Text(outPR));}}}public static class PageRankJobReducer extends Reducer<Text, Text, Text, Text>{private double totalMass = Double.NEGATIVE_INFINITY; // 缓存每个key从其它点得到的全部PR值private double missMass=Double.NEGATIVE_INFINITY;@Overridepublic void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{PageRankNode currentNode = new PageRankNode(key.toString());double inPR = 0.0;for (Text val : values){String[] temp = val.toString().trim().split("\\s+");if (temp.length == 1) // 此时候只输出一个PR值{inPR += Double.valueOf(temp[0]);} else if (temp.length >= 4){// 此时输出的是含有邻接点的节点全信息currentNode = PageRankNode.InstanceFormString(key.toString() + "\t" + val.toString());} else if (temp.length == 3){ // 此时输出的点没有出度context.getCounter("PageRankJobReducer", "errornode").increment(1);currentNode=PageRankNode.InstanceFormString(key.toString() + "\t" + val.toString());}} if (currentNode.getNumDest()>=1){ double newPRofD = (1 - PageRankJob.d) /(double) PageRankJob.nodecount + PageRankJob.d * inPR; currentNode.setNewPR(newPRofD); context.write(new Text(currentNode.getId()), new Text(PageRankNode.toStringWithOutID(currentNode)));}else if (currentNode.getNumDest()==0) {missMass=currentNode.getOldPR();//得到dangling节点的上一次的PR值,传播到下一个分布Pr的job}totalMass += inPR;double partPR=(currentNode.getNewPR()-currentNode.getOldPR())*(currentNode.getNewPR()-currentNode.getOldPR());if (partPR<=threshold){context.getCounter(MidNodes.Reduce).increment(1);}}@Overridepublic void cleanup(Context context) throws IOException, InterruptedException{// 将total记录到文件中Configuration conf = context.getConfiguration();String taskId = conf.get("mapred.task.id");String path = conf.get("PageRankMassPath");// 注意此处的path路径设置------------------ if (missMass==Double.NEGATIVE_INFINITY){return;}FileSystem fs = FileSystem.get(context.getConfiguration());FSDataOutputStream out = fs.create(new Path(path + "/"+"missMass"), false);out.writeDouble(missMass);out.close();}}}?
package com.zxx.PageRank;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Mapper.Context;import com.zxx.Graph.ArrayListOfInts;import com.zxx.Graph.BFSNode;import com.zxx.Graph.HMapII;import com.zxx.Graph.MapII;import com.zxx.Graph.ReachableNodes;public class DistributionPRMass{public class GraphMapper extends Mapper<Object, Text, Text, Text>{private double missingMass = 0.0; private int nodeCnt = 0;@Overridepublic void setup(Context context) throws IOException, InterruptedException{Configuration conf = context.getConfiguration(); missingMass = (double)conf.getFloat("MissingMass", 0.0f);//该值等于1-totalMass nodeCnt = conf.getInt("NodeCount", 0);}@Overridepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException{PageRankNode currentNode=PageRankNode.InstanceFormString(value.toString().trim());currentNode.setOldPR(currentNode.getNewPR());double p=currentNode.getNewPR();double pnew=(1-PageRankJob.d)/(double)(nodeCnt-1)+PageRankJob.d*missingMass/(double)(nodeCnt-1);//double pnew=missingMass/(double)(nodeCnt-1);currentNode.setNewPR(p+pnew);context.write(new Text(currentNode.getId()), new Text(PageRankNode.toStringWithOutID(currentNode)));}@Overridepublic void cleanup(Context context) throws IOException, InterruptedException{}}}?
package com.zxx.PageRank;import org.apache.hadoop.conf.Configurable;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Partitioner;public class RangePartitioner extends Partitioner<Text, Text> implements Configurable{private int nodeCnt = 0;private Configuration conf;public RangePartitioner() {}@Overridepublic Configuration getConf(){return conf;}@Overridepublic void setConf(Configuration arg0){this.conf = arg0; configure();}@Overridepublic int getPartition(Text arg0, Text arg1, int arg2){return (int) ((float)(Integer.parseInt(arg0.toString()) / (float) nodeCnt) * arg2) % arg2;}private void configure() //获得节点的总数{nodeCnt = conf.getInt("NodeCount", 0);}}?
?
?
package com.zxx.PageRank;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counters;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class PageRankDirver{ public static final int numNodes=5; //节点数 public static final int maxiter=10; //最大收敛次数public static void main(String[] args) throws Exception{long count=0; //缓存已经接近收敛的节点个数int it=1;int num=1;String input="/Graph/input/";String output="/Graph/output1";do{Job job=getPageRankJob(input, output);job.waitForCompletion(true);Counters counter = job.getCounters();count = counter.findCounter(PageRankJob.MidNodes.Reduce).getValue();input="/Graph/output"+it;it++;output="/Graph/output"+it; Job job1=getDistrbuteJob(input,output); job1.waitForCompletion(true); input="/Graph/output"+it;it++;output="/Graph/output"+it;if(num<maxiter)System.out.println("it:"+it+" "+count);num++;}while(count!=numNodes);}public static Job getPageRankJob(String inPath,String outPath) throws Exception{Configuration conf = new Configuration();Job job=new Job(conf,"PageRank job");job.getConfiguration().setInt("NodeCount", numNodes); job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false); job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false); job.getConfiguration().set("PageRankMassPath", "/mass"); job.setJarByClass(PageRankDirver.class);job.setNumReduceTasks(5);job.setMapperClass(PageRankJob.PageRankMaper.class);job.setReducerClass(PageRankJob.PageRankJobReducer.class);job.setPartitionerClass(RangePartitioner.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(inPath));FileOutputFormat.setOutputPath(job, new Path(outPath));FileSystem.get(job.getConfiguration()).delete(new Path(outPath), true);//如果文件已存在删除return job;}public static Job getDistrbuteJob(String inPath,String outPath) throws Exception{Configuration conf = new Configuration();Job job=new Job(conf,"Ditribute job");double mass = Double.NEGATIVE_INFINITY; //一下是读取dangling节点的PR值,将其分配到其他节点 FileSystem fs = FileSystem.get(conf); for (FileStatus f : fs.listStatus(new Path("/mass/missMass"))) { FSDataInputStream fin = fs.open(f.getPath()); mass = fin.readDouble(); fin.close(); } job.getConfiguration().setFloat("MissingMass",(float)mass);job.getConfiguration().setInt("NodeCount", numNodes);job.getConfiguration().setInt("NodeCount", numNodes); job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false); job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false); job.getConfiguration().set("PageRankMassPath", "/mass"); job.setJarByClass(PageRankDirver.class);job.setNumReduceTasks(5);job.setMapperClass(PageRankJob.PageRankMaper.class);job.setReducerClass(PageRankJob.PageRankJobReducer.class);job.setPartitionerClass(RangePartitioner.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(inPath));FileOutputFormat.setOutputPath(job, new Path(outPath));FileSystem.get(job.getConfiguration()).delete(new Path(outPath), true);//如果文件已存在删除return job;}}??
?
?
package com.zxx.PageRank;import java.util.*;import javax.naming.spi.DirStateFactory.Result;import javax.xml.soap.Node;public class PageRankNode{private String id;private List<String> destNodes=new ArrayList<String>(); private double oldPR; private double newPR; private int numDest; public PageRankNode() { } public PageRankNode(String id) { this.id=id; } public static String toStringWithOutID(PageRankNode node) { StringBuffer temp=new StringBuffer(); temp.append(node.getOldPR()); temp.append("\t"+node.getNewPR()); temp.append("\t"+node.getNumDest()); for(String dest:node.getDestNodes()) { temp.append("\t"+dest); } return temp.toString(); } public static PageRankNode InstanceFormString(String nodeStr) { PageRankNode node=new PageRankNode(); String[] res=nodeStr.split("\\s+"); node.setId(res[0]); if (res.length==2) { node.setNewPR(Double.valueOf(res[1])); }else if (res.length>4){node.setOldPR(Double.valueOf(res[1]));node.setNewPR(Double.valueOf(res[2]));node.setNumDest(Integer.valueOf(res[3]));for (int i = 4; i < res.length; i++){node.getDestNodes().add(res[i]);}assert(node.getNumDest()==node.getDestNodes().size());} return node; }public String getId(){return id;}public void setId(String id){this.id = id;}public List<String> getDestNodes(){return destNodes;}public void setDestNodes(List<String> destNodes){this.destNodes = destNodes;}public double getOldPR(){return oldPR;}public void setOldPR(double oldPR){this.oldPR = oldPR;}public double getNewPR(){return newPR;}public void setNewPR(double newPR){this.newPR = newPR;}public int getNumDest(){return numDest;}public void setNumDest(int numDest){this.numDest = numDest;} } 3 楼 schaha123 2012-05-18 谢谢啊,这几天仔细研究一下。