首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

nutch源码翻阅(6)-Generator

2013-06-26 
nutch源码阅读(6)-Generator?for (i 0 i depth i++) {// generate new segment//根据传入参数depth

nutch源码阅读(6)-Generator

?

for (i = 0; i < depth; i++) {                  // generate new segment     //根据传入参数depth来决定循环次数,生成segment      Path[] segs = generator.generate(crawlDb, segments, -1, topN, System          .currentTimeMillis());      if (segs == null) {        LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");        break;      }      fetcher.fetch(segs[0], threads);  // fetch it      if (!Fetcher.isParsing(job)) {        parseSegment.parse(segs[0]);    // parse it, if needed      }      crawlDbTool.update(crawlDb, segs, true, true); // update crawldb    }

?

?

?

?

  public Path[] generate(Path dbDir, Path segments, int numLists, long topN,      long curTime, boolean filter, boolean norm, boolean force, int maxNumSegments)      throws IOException {    //生成临时存储路径    Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + "/generate-temp-"        + System.currentTimeMillis());    //生成文件锁    Path lock = new Path(dbDir, CrawlDb.LOCK_NAME);    FileSystem fs = FileSystem.get(getConf());    LockUtil.createLockFile(fs, lock, force);    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");    long start = System.currentTimeMillis();    LOG.info("Generator: starting at " + sdf.format(start));    LOG.info("Generator: Selecting best-scoring urls due for fetch.");    LOG.info("Generator: filtering: " + filter);    LOG.info("Generator: normalizing: " + norm);    if (topN != Long.MAX_VALUE) {      LOG.info("Generator: topN: " + topN);    }        if ("true".equals(getConf().get(GENERATE_MAX_PER_HOST_BY_IP))){      LOG.info("Generator: GENERATE_MAX_PER_HOST_BY_IP will be ignored, use partition.url.mode instead");    }    // map to inverted subset due for fetch, sort by score    JobConf job = new NutchJob(getConf());    job.setJobName("generate: select from " + dbDir);        //用户如果没有指定的话,就默认为map的数量    if (numLists == -1) { // for politeness make      numLists = job.getNumMapTasks(); // a partition per fetch task    }    如果mapreduce设置为local,就只用一个mapper    if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) {      // override      LOG.info("Generator: jobtracker is 'local', generating exactly one partition.");      numLists = 1;    }    设置生成时间    job.setLong(GENERATOR_CUR_TIME, curTime);    // record real generation time    long generateTime = System.currentTimeMillis();    job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);    job.setLong(GENERATOR_TOP_N, topN);    job.setBoolean(GENERATOR_FILTER, filter);    job.setBoolean(GENERATOR_NORMALISE, norm);    job.setInt(GENERATOR_MAX_NUM_SEGMENTS, maxNumSegments);    配置作业信息    FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));    job.setInputFormat(SequenceFileInputFormat.class);    job.setMapperClass(Selector.class);    job.setPartitionerClass(Selector.class);    job.setReducerClass(Selector.class);    FileOutputFormat.setOutputPath(job, tempDir);    job.setOutputFormat(SequenceFileOutputFormat.class);    job.setOutputKeyClass(FloatWritable.class);    job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);    job.setOutputValueClass(SelectorEntry.class);    job.setOutputFormat(GeneratorOutputFormat.class);    try {      JobClient.runJob(job);    } catch (IOException e) {      throw e;    }     ...................     ...................     ...................  }

?

  /** Select & invert subset due for fetch. */    public void map(Text key, CrawlDatum value,        OutputCollector<FloatWritable,SelectorEntry> output, Reporter reporter)        throws IOException {      Text url = key;      //如果有filter设置,先对url进行过滤      if (filter) {        // If filtering is on don't generate URLs that don't pass        // URLFilters        try {          if (filters.filter(url.toString()) == null) return;        } catch (URLFilterException e) {          if (LOG.isWarnEnabled()) {            LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")");          }        }      }      CrawlDatum crawlDatum = value;      // check fetch schedule      //检查抓取时间,没有达到抓取时间就过滤掉      if (!schedule.shouldFetch(url, crawlDatum, curTime)) {        LOG.debug("-shouldFetch rejected '" + url + "', fetchTime="            + crawlDatum.getFetchTime() + ", curTime=" + curTime);        return;      }      LongWritable oldGenTime = (LongWritable) crawlDatum.getMetaData().get(          Nutch.WRITABLE_GENERATE_TIME_KEY);      if (oldGenTime != null) { // awaiting fetch & update        if (oldGenTime.get() + genDelay > curTime) // still wait for        // update        return;      }            //计算得分      float sort = 1.0f;      try {        sort = scfilters.generatorSortValue((Text) key, crawlDatum, sort);      } catch (ScoringFilterException sfe) {        if (LOG.isWarnEnabled()) {          LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe);        }      }            if (restrictStatus != null        && !restrictStatus.equalsIgnoreCase(CrawlDatum.getStatusName(crawlDatum.getStatus()))) return;      // consider only entries with a score superior to the threshold      //如果分值小于阀值,过滤掉      if (scoreThreshold != Float.NaN && sort < scoreThreshold) return;      // consider only entries with a retry (or fetch) interval lower than threshold      if (intervalThreshold != -1 && crawlDatum.getFetchInterval() > intervalThreshold) return;      // sort by decreasing score, using DecreasingFloatComparator      sortValue.set(sort);      // record generation time      //记录生成时间      crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);      entry.datum = crawlDatum;      entry.url = (Text) key;      output.collect(sortValue, entry); // invert for sort by score    }

?

  /** Partition by host / domain or IP. */    //根据domain或ip 来分配给reduce    public int getPartition(FloatWritable key, Writable value, int numReduceTasks) {      return partitioner.getPartition(((SelectorEntry) value).url, key, numReduceTasks);    }

?

? ??

?

public void configure(JobConf job) {      curTime = job.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis());      limit = job.getLong(GENERATOR_TOP_N, Long.MAX_VALUE) / job.getNumReduceTasks();      maxCount = job.getInt(GENERATOR_MAX_COUNT, -1);      // back compatibility with old param      int oldMaxPerHost = job.getInt(GENERATE_MAX_PER_HOST, -1);      if (maxCount==-1 && oldMaxPerHost!=-1){        maxCount = oldMaxPerHost;        byDomain = false;      }      if (GENERATOR_COUNT_VALUE_DOMAIN.equals(job.get(GENERATOR_COUNT_MODE))) byDomain = true;      filters = new URLFilters(job);      normalise = job.getBoolean(GENERATOR_NORMALISE, true);      if (normalise) normalizers = new URLNormalizers(job,          URLNormalizers.SCOPE_GENERATE_HOST_COUNT);      scfilters = new ScoringFilters(job);      partitioner.configure(job);      filter = job.getBoolean(GENERATOR_FILTER, true);      genDelay = job.getLong(GENERATOR_DELAY, 7L) * 3600L * 24L * 1000L;      long time = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);      if (time > 0) genTime.set(time);      schedule = FetchScheduleFactory.getFetchSchedule(job);      scoreThreshold = job.getFloat(GENERATOR_MIN_SCORE, Float.NaN);      intervalThreshold = job.getInt(GENERATOR_MIN_INTERVAL, -1);      restrictStatus = job.get(GENERATOR_RESTRICT_STATUS, null);      maxNumSegments = job.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1);      segCounts = new int[maxNumSegments];    }

?

?

    /** Collect until limit is reached. */    public void reduce(FloatWritable key, Iterator<SelectorEntry> values,        OutputCollector<FloatWritable,SelectorEntry> output, Reporter reporter)        throws IOException {      while (values.hasNext()) {        if (count == limit) {          // do we have any segments left?          if (currentsegmentnum < maxNumSegments) {            count = 0;            currentsegmentnum++;          } else break;        }        SelectorEntry entry = values.next();        Text url = entry.url;        String urlString = url.toString();        URL u = null;        String hostordomain = null;        try {          if (normalise && normalizers != null) {            urlString = normalizers.normalize(urlString,                URLNormalizers.SCOPE_GENERATE_HOST_COUNT);          }          u = new URL(urlString);          if (byDomain) {            hostordomain = URLUtil.getDomainName(u);          } else {            hostordomain = new URL(urlString).getHost();          }        } catch (Exception e) {          LOG.warn("Malformed URL: '" + urlString + "', skipping ("              + StringUtils.stringifyException(e) + ")");          reporter.getCounter("Generator", "MALFORMED_URL").increment(1);          continue;        }        hostordomain = hostordomain.toLowerCase();        // only filter if we are counting hosts or domains        if (maxCount > 0) {          int[] hostCount = hostCounts.get(hostordomain);          if (hostCount == null) {            hostCount = new int[] {1, 0};            hostCounts.put(hostordomain, hostCount);          }          // increment hostCount          hostCount[1]++;          // check if topN reached, select next segment if it is          while (segCounts[hostCount[0]-1] >= limit && hostCount[0] < maxNumSegments) {            hostCount[0]++;            hostCount[1] = 0;          }          // reached the limit of allowed URLs per host / domain          // see if we can put it in the next segment?          if (hostCount[1] >= maxCount) {            if (hostCount[0] < maxNumSegments) {              hostCount[0]++;              hostCount[1] = 0;            } else {              if (hostCount[1] == maxCount + 1 && LOG.isInfoEnabled()) {                LOG.info("Host or domain " + hostordomain + " has more than " + maxCount                    + " URLs for all " + maxNumSegments + " segments. Additional URLs won't be included in the fetchlist.");              }              // skip this entry              continue;            }          }          entry.segnum = new IntWritable(hostCount[0]);          segCounts[hostCount[0]-1]++;        } else {          entry.segnum = new IntWritable(currentsegmentnum);          segCounts[currentsegmentnum-1]++;        }        output.collect(key, entry);        // Count is incremented only when we keep the URL        // maxCount may cause us to skip it.        count++;      }    }  }

?

? ?可以通过上述代码看出,Generator的第一个Job,实现的逻辑如下:

? ? ? ? ? ? ? ? ? ?1.根据条件过滤不满足的

? ? ? ? ? ? ? ? ? ?2.根据配置生成相应数量的segments

? ? ? ? ? ? ? ? ? ?3.计算出每个url所属的segments

?

?

?

?

?

热点排行