mahout推荐引擎使用hadoop(一)
@Override public int run(String[] args) throws Exception {
?? 函数的开始是载入一大堆各种参数,略过。
?
?
?? 1、第一个Job,是在PreparePreferenceMatrixJob中进行的
?
if (shouldRunNextPhase(parsedArgs, currentPhase)) { ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{ "--input", getInputPath().toString(), "--output", prepPath.toString(), "--maxPrefsPerUser", String.valueOf(maxPrefsPerUser), "--minPrefsPerUser", String.valueOf(minPrefsPerUser), "--booleanData", String.valueOf(booleanData), "--tempDir", getTempPath().toString() }); }??
下边详细的分析一下PreparePreferenceMatrixJob类的实现:
???
public class PreparePreferenceMatrixJob extends AbstractJob {@Override public int run(String[] args) throws Exception {//将long类型的Job itemIDIndex = prepareJob(getInputPath(), getOutputPath(ITEMID_INDEX), TextInputFormat.class, ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class, ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class); Job toUserVectors = prepareJob(getInputPath(), getOutputPath(USER_VECTORS), TextInputFormat.class, ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class, ToUserVectorsReducer.class, VarLongWritable.class, VectorWritable.class, SequenceFileOutputFormat.class); Job toItemVectors = prepareJob(getOutputPath(USER_VECTORS), getOutputPath(RATING_MATRIX), ToItemVectorsMapper.class, IntWritable.class, VectorWritable.class, ToItemVectorsReducer.class, IntWritable.class, VectorWritable.class); toItemVectors.setCombinerClass(ToItemVectorsReducer.class);}?
?
?? 首先说一下prepareJob()函数,他的参数列表
?? (输入路径, 输出路径, Mapper类,Mapper的key,Mapper的value,reducer类,reducer的key,reducer的value)
至于这函数里边的实现就不用看了,反正就是执行了一个MapReducer操作。
?
下边挨个分析这些MapReducer的具体计算过程:
(1) Mapper :? ItemIDIndexMapper类,将类型为Long的itemid转换成int类型的itemid_index,并输出<itemid_index, itemid>这样将所有的item先进行split
?
?
public final class ItemIDIndexMapper extends Mapper<LongWritable,Text, VarIntWritable, VarLongWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString()); long itemID = Long.parseLong(tokens[transpose ? 0 : 1]); int index = TasteHadoopUtils.idToIndex(itemID); context.write(new VarIntWritable(index), new VarLongWritable(itemID)); } }?
?
?Reducer : temIDIndexReducer类,对每一个itemid_index下的所有的itemid取最小值,并输出<itemid_index, minimumItemid>
?
public final class ItemIDIndexReducer extends Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable> { //对每一个itemid_index下的所有的itemid取最小值,并输出<itemid_index, minimumItemid> @Override protected void reduce(VarIntWritable index, Iterable<VarLongWritable> possibleItemIDs, Context context) throws IOException, InterruptedException { long minimumItemID = Long.MAX_VALUE; for (VarLongWritable varLongWritable : possibleItemIDs) { long itemID = varLongWritable.get(); if (itemID < minimumItemID) { minimumItemID = itemID; } } if (minimumItemID != Long.MAX_VALUE) { context.write(index, new VarLongWritable(minimumItemID)); } } }?(2)Mapper :? ToItemPrefsMapper类,继承于ToEntityPrefsMapper类,作用是从文件中读取数据,并以
?
? userid,<itemid, pref>? 的形式作为reducer的输出
?
public final class ToItemPrefsMapper extends ToEntityPrefsMapper { public ToItemPrefsMapper() { super(false); } }public abstract class ToEntityPrefsMapper extends Mapper<LongWritable,Text, VarLongWritable,VarLongWritable> {@Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = DELIMITER.split(value.toString()); long userID = Long.parseLong(tokens[0]); long itemID = Long.parseLong(tokens[1]); if (itemKey ^ transpose) { // If using items as keys, and not transposing items and users, then users are items! // Or if not using items as keys (users are, as usual), but transposing items and users, // then users are items! Confused? long temp = userID; userID = itemID; itemID = temp; } if (booleanData) { context.write(new VarLongWritable(userID), new VarLongWritable(itemID)); } else { float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) + ratingShift : 1.0f; context.write(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue)); } }}?
?
?Reducer : ToUserVectorsReducer类,收集同一个userid下的<itemid, pref>对,并将itemid映射成itemid_index,然后和pref组成<itemid_index, pref>对,并将同一个userid下的所有<itemid_index, pref>对保存到一个vecotr(RandomAccessSparseVector类型的)中,并输出 <userid,vector> 类型的结果
?
?
?
public final class ToUserVectorsReducer extends Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> { @Override protected void reduce(VarLongWritable userID, Iterable<VarLongWritable> itemPrefs, Context context) throws IOException, InterruptedException { Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100); for (VarLongWritable itemPref : itemPrefs) { int index = TasteHadoopUtils.idToIndex(itemPref.get()); float value = itemPref instanceof EntityPrefWritable ? ((EntityPrefWritable) itemPref).getPrefValue() : 1.0f; userVector.set(index, value); } if (userVector.getNumNondefaultElements() >= minPreferences) { VectorWritable vw = new VectorWritable(userVector); vw.setWritesLaxPrecision(true); context.getCounter(Counters.USERS).increment(1); context.write(userID, vw); } } }?
(3)Mapper :ToItemVectorsMapper类,将上边(2)中的输出作为Mapper的输入,
?
public class ToItemVectorsMapper extends Mapper<VarLongWritable,VectorWritable,IntWritable,VectorWritable> { @Override protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable, Context ctx) throws IOException, InterruptedException { Vector userRatings = vectorWritable.get(); int numElementsBeforeSampling = userRatings.getNumNondefaultElements(); userRatings = Vectors.maybeSample(userRatings, sampleSize); int numElementsAfterSampling = userRatings.getNumNondefaultElements(); int column = TasteHadoopUtils.idToIndex(rowIndex.get()); VectorWritable itemVector = new VectorWritable(new RandomAccessSparseVector(Integer.MAX_VALUE, 1)); itemVector.setWritesLaxPrecision(true); Iterator<Vector.Element> iterator = userRatings.iterateNonZero(); while (iterator.hasNext()) { Vector.Element elem = iterator.next(); itemVector.get().setQuick(column, elem.get()); ctx.write(new IntWritable(elem.index()), itemVector); } ctx.getCounter(Elements.USER_RATINGS_USED).increment(numElementsAfterSampling); ctx.getCounter(Elements.USER_RATINGS_NEGLECTED).increment(numElementsBeforeSampling - numElementsAfterSampling); }}?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?