首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

mahout推荐发动机使用hadoop(一)

2013-03-01 
mahout推荐引擎使用hadoop(一)@Overridepublic int run(String[] args) throws Exception {?? 函数的开始

mahout推荐引擎使用hadoop(一)
@Override public int run(String[] args) throws Exception {

?? 函数的开始是载入一大堆各种参数,略过。

?

?

?? 1、第一个Job,是在PreparePreferenceMatrixJob中进行的

?

 if (shouldRunNextPhase(parsedArgs, currentPhase)) {      ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{          "--input", getInputPath().toString(),          "--output", prepPath.toString(),          "--maxPrefsPerUser", String.valueOf(maxPrefsPerUser),          "--minPrefsPerUser", String.valueOf(minPrefsPerUser),          "--booleanData", String.valueOf(booleanData),          "--tempDir", getTempPath().toString() });    }

??

下边详细的分析一下PreparePreferenceMatrixJob类的实现:

???

public class PreparePreferenceMatrixJob extends AbstractJob {@Override  public int run(String[] args) throws Exception {//将long类型的Job itemIDIndex = prepareJob(getInputPath(), getOutputPath(ITEMID_INDEX), TextInputFormat.class,            ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class, ItemIDIndexReducer.class,            VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class); Job toUserVectors = prepareJob(getInputPath(), getOutputPath(USER_VECTORS), TextInputFormat.class,            ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,            ToUserVectorsReducer.class, VarLongWritable.class, VectorWritable.class, SequenceFileOutputFormat.class); Job toItemVectors = prepareJob(getOutputPath(USER_VECTORS), getOutputPath(RATING_MATRIX),            ToItemVectorsMapper.class, IntWritable.class, VectorWritable.class, ToItemVectorsReducer.class,            IntWritable.class, VectorWritable.class);    toItemVectors.setCombinerClass(ToItemVectorsReducer.class);}

?

?

?? 首先说一下prepareJob()函数,他的参数列表

?? (输入路径, 输出路径, Mapper类,Mapper的key,Mapper的value,reducer类,reducer的key,reducer的value)

至于这函数里边的实现就不用看了,反正就是执行了一个MapReducer操作。

?

下边挨个分析这些MapReducer的具体计算过程:

(1) Mapper :? ItemIDIndexMapper类,将类型为Long的itemid转换成int类型的itemid_index,并输出<itemid_index, itemid>这样将所有的item先进行split

?

?

public final class ItemIDIndexMapper extends    Mapper<LongWritable,Text, VarIntWritable, VarLongWritable>{ @Override  protected void map(LongWritable key,                     Text value,                     Context context) throws IOException, InterruptedException {    String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());    long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);    int index = TasteHadoopUtils.idToIndex(itemID);    context.write(new VarIntWritable(index), new VarLongWritable(itemID));  }  }

?

?

?Reducer : temIDIndexReducer类,对每一个itemid_index下的所有的itemid取最小值,并输出<itemid_index, minimumItemid>

?

public final class ItemIDIndexReducer extends    Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable> {   //对每一个itemid_index下的所有的itemid取最小值,并输出<itemid_index, minimumItemid>  @Override  protected void reduce(VarIntWritable index,                        Iterable<VarLongWritable> possibleItemIDs,                        Context context) throws IOException, InterruptedException {    long minimumItemID = Long.MAX_VALUE;    for (VarLongWritable varLongWritable : possibleItemIDs) {      long itemID = varLongWritable.get();      if (itemID < minimumItemID) {        minimumItemID = itemID;      }    }    if (minimumItemID != Long.MAX_VALUE) {      context.write(index, new VarLongWritable(minimumItemID));    }  }  }

?(2)Mapper :? ToItemPrefsMapper类,继承于ToEntityPrefsMapper类,作用是从文件中读取数据,并以

?

? userid,<itemid, pref>? 的形式作为reducer的输出

?

public final class ToItemPrefsMapper extends ToEntityPrefsMapper {  public ToItemPrefsMapper() {    super(false);  }  }public abstract class ToEntityPrefsMapper extends    Mapper<LongWritable,Text, VarLongWritable,VarLongWritable> {@Override  public void map(LongWritable key,                  Text value,                  Context context) throws IOException, InterruptedException {    String[] tokens = DELIMITER.split(value.toString());    long userID = Long.parseLong(tokens[0]);    long itemID = Long.parseLong(tokens[1]);    if (itemKey ^ transpose) {      // If using items as keys, and not transposing items and users, then users are items!      // Or if not using items as keys (users are, as usual), but transposing items and users,      // then users are items! Confused?      long temp = userID;      userID = itemID;      itemID = temp;    }    if (booleanData) {      context.write(new VarLongWritable(userID), new VarLongWritable(itemID));    } else {      float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) + ratingShift : 1.0f;      context.write(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));    }  }}

?

?

?Reducer : ToUserVectorsReducer类,收集同一个userid下的<itemid, pref>对,并将itemid映射成itemid_index,然后和pref组成<itemid_index, pref>对,并将同一个userid下的所有<itemid_index, pref>对保存到一个vecotr(RandomAccessSparseVector类型的)中,并输出 <userid,vector> 类型的结果

?

?

?

public final class ToUserVectorsReducer extends    Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {  @Override  protected void reduce(VarLongWritable userID,                        Iterable<VarLongWritable> itemPrefs,                        Context context) throws IOException, InterruptedException {    Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);    for (VarLongWritable itemPref : itemPrefs) {      int index = TasteHadoopUtils.idToIndex(itemPref.get());      float value = itemPref instanceof EntityPrefWritable ? ((EntityPrefWritable) itemPref).getPrefValue() : 1.0f;      userVector.set(index, value);    }    if (userVector.getNumNondefaultElements() >= minPreferences) {      VectorWritable vw = new VectorWritable(userVector);      vw.setWritesLaxPrecision(true);      context.getCounter(Counters.USERS).increment(1);      context.write(userID, vw);    }  }  }

?

(3)Mapper :ToItemVectorsMapper类,将上边(2)中的输出作为Mapper的输入,

?

public class ToItemVectorsMapper    extends Mapper<VarLongWritable,VectorWritable,IntWritable,VectorWritable> {  @Override  protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable, Context ctx)      throws IOException, InterruptedException {    Vector userRatings = vectorWritable.get();    int numElementsBeforeSampling = userRatings.getNumNondefaultElements();    userRatings = Vectors.maybeSample(userRatings, sampleSize);    int numElementsAfterSampling = userRatings.getNumNondefaultElements();    int column = TasteHadoopUtils.idToIndex(rowIndex.get());    VectorWritable itemVector = new VectorWritable(new RandomAccessSparseVector(Integer.MAX_VALUE, 1));    itemVector.setWritesLaxPrecision(true);    Iterator<Vector.Element> iterator = userRatings.iterateNonZero();    while (iterator.hasNext()) {      Vector.Element elem = iterator.next();      itemVector.get().setQuick(column, elem.get());      ctx.write(new IntWritable(elem.index()), itemVector);    }    ctx.getCounter(Elements.USER_RATINGS_USED).increment(numElementsAfterSampling);    ctx.getCounter(Elements.USER_RATINGS_NEGLECTED).increment(numElementsBeforeSampling - numElementsAfterSampling);  }}

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?

热点排行