nutch源码阅读(9)-Fetch
...................................................... for (i = 0; i < depth; i++) { // generate new segment Path[] segs = generator.generate(crawlDb, segments, -1, topN, System .currentTimeMillis()); if (segs == null) { LOG.info("Stopping at depth=" + i + " - no more URLs to fetch."); break; } //通过之前生成的segments抓取 fetcher.fetch(segs[0], threads); // fetch it if (!Fetcher.isParsing(job)) { parseSegment.parse(segs[0]); // parse it, if needed } crawlDbTool.update(crawlDb, segs, true, true); // update crawldb }......................................................
?
?
?
//threads默认通过fetcher.threads.fetch设置,也可通过参数传递 public void fetch(Path segment, int threads) throws IOException { //设置agentName //在http.agent.name中设置,会检查在http.robots.agents中是否存在 checkConfiguration(); //记录开始时间,以及segment路径 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); if (LOG.isInfoEnabled()) { LOG.info("Fetcher: starting at " + sdf.format(start)); LOG.info("Fetcher: segment: " + segment); } // set the actual time for the timelimit relative // to the beginning of the whole job and not of a specific task // otherwise it keeps trying again if a task fails long timelimit = getConf().getLong("fetcher.timelimit.mins", -1); if (timelimit != -1) { timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000); LOG.info("Fetcher Timelimit set for : " + timelimit); getConf().setLong("fetcher.timelimit", timelimit); } // Set the time limit after which the throughput threshold feature is enabled timelimit = getConf().getLong("fetcher.throughput.threshold.check.after", 10); timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000); getConf().setLong("fetcher.throughput.threshold.check.after", timelimit); int maxOutlinkDepth = getConf().getInt("fetcher.follow.outlinks.depth", -1); if (maxOutlinkDepth > 0) { LOG.info("Fetcher: following outlinks up to depth: " + Integer.toString(maxOutlinkDepth)); int maxOutlinkDepthNumLinks = getConf().getInt("fetcher.follow.outlinks.num.links", 4); int outlinksDepthDivisor = getConf().getInt("fetcher.follow.outlinks.depth.divisor", 2); int totalOutlinksToFollow = 0; for (int i = 0; i < maxOutlinkDepth; i++) { totalOutlinksToFollow += (int)Math.floor(outlinksDepthDivisor / (i + 1) * maxOutlinkDepthNumLinks); } LOG.info("Fetcher: maximum outlinks to follow: " + Integer.toString(totalOutlinksToFollow)); } JobConf job = new NutchJob(getConf()); job.setJobName("fetch " + segment); //配置线程数 job.setInt("fetcher.threads.fetch", threads); job.set(Nutch.SEGMENT_NAME_KEY, segment.getName()); // for politeness, don't permit parallel execution of a single task job.setSpeculativeExecution(false); //配置输入 FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME)); job.setInputFormat(InputFormat.class); //配置MapRunner job.setMapRunnerClass(Fetcher.class); //配置输出 FileOutputFormat.setOutputPath(job, segment); job.setOutputFormat(FetcherOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NutchWritable.class); JobClient.runJob(job); long end = System.currentTimeMillis(); LOG.info("Fetcher: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); }
?
?
public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable progress) throws IOException { //定义输出目录 Path out = FileOutputFormat.getOutputPath(job); //定义抓取的输出目录 final Path fetch = new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name); //定义抓取内容的输出目录 final Path content = new Path(new Path(out, Content.DIR_NAME), name); //定义压缩类型 final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job); //定义输出对象 final MapFile.Writer fetchOut = new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class, compType, progress); return new RecordWriter<Text, NutchWritable>() { private MapFile.Writer contentOut; private RecordWriter<Text, Parse> parseOut; { //如果配置"fetcher.store.content"为true,则生成content if (Fetcher.isStoringContent(job)) { contentOut = new MapFile.Writer(job, fs, content.toString(), Text.class, Content.class, compType, progress); } //如果配置"fetcher.parse"为true,而抽取出外连接 if (Fetcher.isParsing(job)) { parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress); } } public void write(Text key, NutchWritable value) throws IOException { Writable w = value.get(); //?对对象类型进行判断,调用相应的抽象输出,写到不同的文件中去? if (w instanceof CrawlDatum) fetchOut.append(key, w); else if (w instanceof Content) contentOut.append(key, w); else if (w instanceof Parse) parseOut.write(key, (Parse)w); } public void close(Reporter reporter) throws IOException { fetchOut.close(); if (contentOut != null) { contentOut.close(); } if (parseOut != null) { parseOut.close(reporter); } } }; }
?
?
?
?