`

nutch源码阅读(6)-Generator

阅读更多

 

for (i = 0; i < depth; i++) {             
     // generate new segment
     //根据传入参数depth来决定循环次数,生成segment
      Path[] segs = generator.generate(crawlDb, segments, -1, topN, System
          .currentTimeMillis());
      if (segs == null) {
        LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");
        break;
      }
      fetcher.fetch(segs[0], threads);  // fetch it
      if (!Fetcher.isParsing(job)) {
        parseSegment.parse(segs[0]);    // parse it, if needed
      }
      crawlDbTool.update(crawlDb, segs, true, true); // update crawldb
    }

 

 

 

 

  public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
      long curTime, boolean filter, boolean norm, boolean force, int maxNumSegments)
      throws IOException {
    //生成临时存储路径
    Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + "/generate-temp-"
        + System.currentTimeMillis());
    //生成文件锁
    Path lock = new Path(dbDir, CrawlDb.LOCK_NAME);
    FileSystem fs = FileSystem.get(getConf());
    LockUtil.createLockFile(fs, lock, force);

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    long start = System.currentTimeMillis();
    LOG.info("Generator: starting at " + sdf.format(start));
    LOG.info("Generator: Selecting best-scoring urls due for fetch.");
    LOG.info("Generator: filtering: " + filter);
    LOG.info("Generator: normalizing: " + norm);
    if (topN != Long.MAX_VALUE) {
      LOG.info("Generator: topN: " + topN);
    }
    
    if ("true".equals(getConf().get(GENERATE_MAX_PER_HOST_BY_IP))){
      LOG.info("Generator: GENERATE_MAX_PER_HOST_BY_IP will be ignored, use partition.url.mode instead");
    }

    // map to inverted subset due for fetch, sort by score
    JobConf job = new NutchJob(getConf());
    job.setJobName("generate: select from " + dbDir);
    
    //用户如果没有指定的话,就默认为map的数量
    if (numLists == -1) { // for politeness make
      numLists = job.getNumMapTasks(); // a partition per fetch task
    }
    如果mapreduce设置为local,就只用一个mapper
    if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) {
      // override
      LOG.info("Generator: jobtracker is 'local', generating exactly one partition.");
      numLists = 1;
    }
    设置生成时间
    job.setLong(GENERATOR_CUR_TIME, curTime);
    // record real generation time
    long generateTime = System.currentTimeMillis();
    job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
    job.setLong(GENERATOR_TOP_N, topN);
    job.setBoolean(GENERATOR_FILTER, filter);
    job.setBoolean(GENERATOR_NORMALISE, norm);
    job.setInt(GENERATOR_MAX_NUM_SEGMENTS, maxNumSegments);
    配置作业信息
    FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
    job.setInputFormat(SequenceFileInputFormat.class);

    job.setMapperClass(Selector.class);
    job.setPartitionerClass(Selector.class);
    job.setReducerClass(Selector.class);

    FileOutputFormat.setOutputPath(job, tempDir);
    job.setOutputFormat(SequenceFileOutputFormat.class);
    job.setOutputKeyClass(FloatWritable.class);
    job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);
    job.setOutputValueClass(SelectorEntry.class);
    job.setOutputFormat(GeneratorOutputFormat.class);

    try {
      JobClient.runJob(job);
    } catch (IOException e) {
      throw e;
    }

     ...................
     ...................
     ...................
  }

 

  /** Select & invert subset due for fetch. */
    public void map(Text key, CrawlDatum value,
        OutputCollector<FloatWritable,SelectorEntry> output, Reporter reporter)
        throws IOException {
      Text url = key;
      //如果有filter设置,先对url进行过滤
      if (filter) {
        // If filtering is on don't generate URLs that don't pass
        // URLFilters
        try {
          if (filters.filter(url.toString()) == null) return;
        } catch (URLFilterException e) {
          if (LOG.isWarnEnabled()) {
            LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")");
          }
        }
      }
      CrawlDatum crawlDatum = value;

      // check fetch schedule
      //检查抓取时间,没有达到抓取时间就过滤掉
      if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
        LOG.debug("-shouldFetch rejected '" + url + "', fetchTime="
            + crawlDatum.getFetchTime() + ", curTime=" + curTime);
        return;
      }

      LongWritable oldGenTime = (LongWritable) crawlDatum.getMetaData().get(
          Nutch.WRITABLE_GENERATE_TIME_KEY);
      if (oldGenTime != null) { // awaiting fetch & update
        if (oldGenTime.get() + genDelay > curTime) // still wait for
        // update
        return;
      }
      
      //计算得分
      float sort = 1.0f;
      try {
        sort = scfilters.generatorSortValue((Text) key, crawlDatum, sort);
      } catch (ScoringFilterException sfe) {
        if (LOG.isWarnEnabled()) {
          LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe);
        }
      }
      
      if (restrictStatus != null
        && !restrictStatus.equalsIgnoreCase(CrawlDatum.getStatusName(crawlDatum.getStatus()))) return;

      // consider only entries with a score superior to the threshold
      //如果分值小于阀值,过滤掉
      if (scoreThreshold != Float.NaN && sort < scoreThreshold) return;

      // consider only entries with a retry (or fetch) interval lower than threshold
      if (intervalThreshold != -1 && crawlDatum.getFetchInterval() > intervalThreshold) return;

      // sort by decreasing score, using DecreasingFloatComparator
      sortValue.set(sort);
      // record generation time
      //记录生成时间
      crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
      entry.datum = crawlDatum;
      entry.url = (Text) key;
      output.collect(sortValue, entry); // invert for sort by score
    }

 

  /** Partition by host / domain or IP. */
    //根据domain或ip 来分配给reduce
    public int getPartition(FloatWritable key, Writable value, int numReduceTasks) {
      return partitioner.getPartition(((SelectorEntry) value).url, key, numReduceTasks);
    }

 

    

 

public void configure(JobConf job) {
      curTime = job.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis());
      limit = job.getLong(GENERATOR_TOP_N, Long.MAX_VALUE) / job.getNumReduceTasks();
      maxCount = job.getInt(GENERATOR_MAX_COUNT, -1);
      // back compatibility with old param
      int oldMaxPerHost = job.getInt(GENERATE_MAX_PER_HOST, -1);
      if (maxCount==-1 && oldMaxPerHost!=-1){
        maxCount = oldMaxPerHost;
        byDomain = false;
      }
      if (GENERATOR_COUNT_VALUE_DOMAIN.equals(job.get(GENERATOR_COUNT_MODE))) byDomain = true;
      filters = new URLFilters(job);
      normalise = job.getBoolean(GENERATOR_NORMALISE, true);
      if (normalise) normalizers = new URLNormalizers(job,
          URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
      scfilters = new ScoringFilters(job);
      partitioner.configure(job);
      filter = job.getBoolean(GENERATOR_FILTER, true);
      genDelay = job.getLong(GENERATOR_DELAY, 7L) * 3600L * 24L * 1000L;
      long time = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
      if (time > 0) genTime.set(time);
      schedule = FetchScheduleFactory.getFetchSchedule(job);
      scoreThreshold = job.getFloat(GENERATOR_MIN_SCORE, Float.NaN);
      intervalThreshold = job.getInt(GENERATOR_MIN_INTERVAL, -1);
      restrictStatus = job.get(GENERATOR_RESTRICT_STATUS, null);
      maxNumSegments = job.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1);
      segCounts = new int[maxNumSegments];
    }

 

 

    /** Collect until limit is reached. */
    public void reduce(FloatWritable key, Iterator<SelectorEntry> values,
        OutputCollector<FloatWritable,SelectorEntry> output, Reporter reporter)
        throws IOException {

      while (values.hasNext()) {

        if (count == limit) {
          // do we have any segments left?
          if (currentsegmentnum < maxNumSegments) {
            count = 0;
            currentsegmentnum++;
          } else break;
        }

        SelectorEntry entry = values.next();
        Text url = entry.url;
        String urlString = url.toString();
        URL u = null;

        String hostordomain = null;

        try {
          if (normalise && normalizers != null) {
            urlString = normalizers.normalize(urlString,
                URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
          }
          u = new URL(urlString);
          if (byDomain) {
            hostordomain = URLUtil.getDomainName(u);
          } else {
            hostordomain = new URL(urlString).getHost();
          }
        } catch (Exception e) {
          LOG.warn("Malformed URL: '" + urlString + "', skipping ("
              + StringUtils.stringifyException(e) + ")");
          reporter.getCounter("Generator", "MALFORMED_URL").increment(1);
          continue;
        }

        hostordomain = hostordomain.toLowerCase();

        // only filter if we are counting hosts or domains
        if (maxCount > 0) {
          int[] hostCount = hostCounts.get(hostordomain);
          if (hostCount == null) {
            hostCount = new int[] {1, 0};
            hostCounts.put(hostordomain, hostCount);
          }

          // increment hostCount
          hostCount[1]++;

          // check if topN reached, select next segment if it is
          while (segCounts[hostCount[0]-1] >= limit && hostCount[0] < maxNumSegments) {
            hostCount[0]++;
            hostCount[1] = 0;
          }

          // reached the limit of allowed URLs per host / domain
          // see if we can put it in the next segment?
          if (hostCount[1] >= maxCount) {
            if (hostCount[0] < maxNumSegments) {
              hostCount[0]++;
              hostCount[1] = 0;
            } else {
              if (hostCount[1] == maxCount + 1 && LOG.isInfoEnabled()) {
                LOG.info("Host or domain " + hostordomain + " has more than " + maxCount
                    + " URLs for all " + maxNumSegments + " segments. Additional URLs won't be included in the fetchlist.");
              }
              // skip this entry
              continue;
            }
          }
          entry.segnum = new IntWritable(hostCount[0]);
          segCounts[hostCount[0]-1]++;
        } else {
          entry.segnum = new IntWritable(currentsegmentnum);
          segCounts[currentsegmentnum-1]++;
        }

        output.collect(key, entry);

        // Count is incremented only when we keep the URL
        // maxCount may cause us to skip it.
        count++;
      }
    }
  }

 

   可以通过上述代码看出,Generator的第一个Job,实现的逻辑如下:

                   1.根据条件过滤不满足的

                   2.根据配置生成相应数量的segments

                   3.计算出每个url所属的segments

 

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics