`
tcxiang
  • 浏览: 87968 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

hadoop reduceTask

 
阅读更多
 public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, InterruptedException, ClassNotFoundException {
    this.umbilical = umbilical;
    job.setBoolean("mapred.skip.on", isSkipping());

    if (isMapOrReduce()) {
      copyPhase = getProgress().addPhase("copy");
      sortPhase  = getProgress().addPhase("sort");
      reducePhase = getProgress().addPhase("reduce");
    }
    // start thread that will handle communication with parent
    TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
        jvmContext);
    reporter.startCommunicationThread();
    boolean useNewApi = job.getUseNewReducer();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }
    
    // Initialize the codec
    codec = initCodec();

    boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
    if (!isLocal) {
      reduceCopier = new ReduceCopier(umbilical, job, reporter);
      if (!reduceCopier.fetchOutputs()) {
        if(reduceCopier.mergeThrowable instanceof FSError) {
          throw (FSError)reduceCopier.mergeThrowable;
        }
        throw new IOException("Task: " + getTaskID() + 
            " - The reduce copier failed", reduceCopier.mergeThrowable);
      }
    }
    copyPhase.complete();                         // copy is already complete
    setPhase(TaskStatus.Phase.SORT);
    statusUpdate(umbilical);

    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
    RawKeyValueIterator rIter = isLocal
      ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
          job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
          !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
          new Path(getTaskID().toString()), job.getOutputKeyComparator(),
          reporter, spilledRecordsCounter, null)
      : reduceCopier.createKVIterator(job, rfs, reporter);
        
    // free up the data structures
    mapOutputFilesOnDisk.clear();
    
    sortPhase.complete();                         // sort is complete
    setPhase(TaskStatus.Phase.REDUCE); 
    statusUpdate(umbilical);
    Class keyClass = job.getMapOutputKeyClass();
    Class valueClass = job.getMapOutputValueClass();
    RawComparator comparator = job.getOutputValueGroupingComparator();

    if (useNewApi) {
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    } else {
      runOldReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }
    done(umbilical, reporter);
  }

 

 

FetchOutput

   public boolean fetchOutputs() throws IOException {
      int totalFailures = 0;
      int            numInFlight = 0, numCopied = 0;
      DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
      final Progress copyPhase = 
        reduceTask.getProgress().phase();
      LocalFSMerger localFSMergerThread = null;
      InMemFSMergeThread inMemFSMergeThread = null;
      GetMapEventsThread getMapEventsThread = null;
      
      for (int i = 0; i < numMaps; i++) {
        copyPhase.addPhase();       // add sub-phase per file
      }
      
      copiers = new ArrayList<MapOutputCopier>(numCopiers);
      
      // start all the copying threads
      for (int i=0; i < numCopiers; i++) {
        MapOutputCopier copier = new MapOutputCopier(conf, reporter, 
            reduceTask.getJobTokenSecret());
        copiers.add(copier);
        copier.start();
      }
      
      //start the on-disk-merge thread
      localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
      //start the in memory merger thread
      inMemFSMergeThread = new InMemFSMergeThread();
      localFSMergerThread.start();
      inMemFSMergeThread.start();
      
      // start the map events thread
      getMapEventsThread = new GetMapEventsThread();
      getMapEventsThread.start();
      
      // start the clock for bandwidth measurement
      long startTime = System.currentTimeMillis();
      long currentTime = startTime;
      long lastProgressTime = startTime;
      long lastOutputTime = 0;
      
        // loop until we get all required outputs
        while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
          int numEventsAtStartOfScheduling;
          synchronized (copyResultsOrNewEventsLock) {
            numEventsAtStartOfScheduling = numEventsFetched;
          }
          
          currentTime = System.currentTimeMillis();
          boolean logNow = false;
          if (currentTime - lastOutputTime > MIN_LOG_TIME) {
            lastOutputTime = currentTime;
            logNow = true;
          }
          if (logNow) {
            LOG.info(reduceTask.getTaskID() + " Need another " 
                   + (numMaps - copiedMapOutputs.size()) + " map output(s) "
                   + "where " + numInFlight + " is already in progress");
          }

          // Put the hash entries for the failed fetches.
          Iterator<MapOutputLocation> locItr = retryFetches.iterator();

          while (locItr.hasNext()) {
            MapOutputLocation loc = locItr.next(); 
            List<MapOutputLocation> locList = 
              mapLocations.get(loc.getHost());
            
            // Check if the list exists. Map output location mapping is cleared 
            // once the jobtracker restarts and is rebuilt from scratch.
            // Note that map-output-location mapping will be recreated and hence
            // we continue with the hope that we might find some locations
            // from the rebuild map.
            if (locList != null) {
              // Add to the beginning of the list so that this map is 
              //tried again before the others and we can hasten the 
              //re-execution of this map should there be a problem
              locList.add(0, loc);
            }
          }

          if (retryFetches.size() > 0) {
            LOG.info(reduceTask.getTaskID() + ": " +  
                  "Got " + retryFetches.size() +
                  " map-outputs from previous failures");
          }
          // clear the "failed" fetches hashmap
          retryFetches.clear();

          // now walk through the cache and schedule what we can
          int numScheduled = 0;
          int numDups = 0;
          
          synchronized (scheduledCopies) {
  
            // Randomize the map output locations to prevent 
            // all reduce-tasks swamping the same tasktracker
            List<String> hostList = new ArrayList<String>();
            hostList.addAll(mapLocations.keySet()); 
            
            Collections.shuffle(hostList, this.random);
              
            Iterator<String> hostsItr = hostList.iterator();

            while (hostsItr.hasNext()) {
            
              String host = hostsItr.next();

              List<MapOutputLocation> knownOutputsByLoc = 
                mapLocations.get(host);

              // Check if the list exists. Map output location mapping is 
              // cleared once the jobtracker restarts and is rebuilt from 
              // scratch.
              // Note that map-output-location mapping will be recreated and 
              // hence we continue with the hope that we might find some 
              // locations from the rebuild map and add then for fetching.
              if (knownOutputsByLoc == null || knownOutputsByLoc.size() == 0) {
                continue;
              }
              
              //Identify duplicate hosts here
              if (uniqueHosts.contains(host)) {
                 numDups += knownOutputsByLoc.size(); 
                 continue;
              }

              Long penaltyEnd = penaltyBox.get(host);
              boolean penalized = false;
            
              if (penaltyEnd != null) {
                if (currentTime < penaltyEnd.longValue()) {
                  penalized = true;
                } else {
                  penaltyBox.remove(host);
                }
              }
              
              if (penalized)
                continue;

              synchronized (knownOutputsByLoc) {
              
                locItr = knownOutputsByLoc.iterator();
            
                while (locItr.hasNext()) {
              
                  MapOutputLocation loc = locItr.next();
              
                  // Do not schedule fetches from OBSOLETE maps
                  if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
                    locItr.remove();
                    continue;
                  }

                  uniqueHosts.add(host);
                  scheduledCopies.add(loc);
                  locItr.remove();  // remove from knownOutputs
                  numInFlight++; numScheduled++;

                  break; //we have a map from this host
                }
              }
            }
            scheduledCopies.notifyAll();
          }

          if (numScheduled > 0 || logNow) {
            LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
                   " outputs (" + penaltyBox.size() +
                   " slow hosts and" + numDups + " dup hosts)");
          }

          if (penaltyBox.size() > 0 && logNow) {
            LOG.info("Penalized(slow) Hosts: ");
            for (String host : penaltyBox.keySet()) {
              LOG.info(host + " Will be considered after: " + 
                  ((penaltyBox.get(host) - currentTime)/1000) + " seconds.");
            }
          }

          // if we have no copies in flight and we can't schedule anything
          // new, just wait for a bit
          try {
            if (numInFlight == 0 && numScheduled == 0) {
              // we should indicate progress as we don't want TT to think
              // we're stuck and kill us
              reporter.progress();
              Thread.sleep(5000);
            }
          } catch (InterruptedException e) { } // IGNORE
          
          while (numInFlight > 0 && mergeThrowable == null) {
            LOG.debug(reduceTask.getTaskID() + " numInFlight = " + 
                      numInFlight);
            //the call to getCopyResult will either 
            //1) return immediately with a null or a valid CopyResult object,
            //                 or
            //2) if the numInFlight is above maxInFlight, return with a 
            //   CopyResult object after getting a notification from a 
            //   fetcher thread, 
            //So, when getCopyResult returns null, we can be sure that
            //we aren't busy enough and we should go and get more mapcompletion
            //events from the tasktracker
            CopyResult cr = getCopyResult(numInFlight, numEventsAtStartOfScheduling);

            if (cr == null) {
              break;
            }
            
            if (cr.getSuccess()) {  // a successful copy
              numCopied++;
              lastProgressTime = System.currentTimeMillis();
              reduceShuffleBytes.increment(cr.getSize());
                
              long secsSinceStart = 
                (System.currentTimeMillis()-startTime)/1000+1;
              float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024);
              float transferRate = mbs/secsSinceStart;
                
              copyPhase.startNextPhase();
              copyPhase.setStatus("copy (" + numCopied + " of " + numMaps 
                                  + " at " +
                                  mbpsFormat.format(transferRate) +  " MB/s)");
                
              // Note successful fetch for this mapId to invalidate
              // (possibly) old fetch-failures
              fetchFailedMaps.remove(cr.getLocation().getTaskId());
            } else if (cr.isObsolete()) {
              //ignore
              LOG.info(reduceTask.getTaskID() + 
                       " Ignoring obsolete copy result for Map Task: " + 
                       cr.getLocation().getTaskAttemptId() + " from host: " + 
                       cr.getHost());
            } else {
              retryFetches.add(cr.getLocation());
              
              // note the failed-fetch
              TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
              TaskID mapId = cr.getLocation().getTaskId();
              
              totalFailures++;
              Integer noFailedFetches = 
                mapTaskToFailedFetchesMap.get(mapTaskId);
              noFailedFetches = 
                (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
              mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
              LOG.info("Task " + getTaskID() + ": Failed fetch #" + 
                       noFailedFetches + " from " + mapTaskId);

              if (noFailedFetches >= abortFailureLimit) {
                LOG.fatal(noFailedFetches + " failures downloading "
                          + getTaskID() + ".");
                umbilical.shuffleError(getTaskID(),
                                 "Exceeded the abort failure limit;"
                                 + " bailing-out.", jvmContext);
              }
              
              checkAndInformJobTracker(noFailedFetches, mapTaskId,
                  cr.getError().equals(CopyOutputErrorType.READ_ERROR));

              // note unique failed-fetch maps
              if (noFailedFetches == maxFetchFailuresBeforeReporting) {
                fetchFailedMaps.add(mapId);
                  
                // did we have too many unique failed-fetch maps?
                // and did we fail on too many fetch attempts?
                // and did we progress enough
                //     or did we wait for too long without any progress?
               
                // check if the reducer is healthy
                boolean reducerHealthy = 
                    (((float)totalFailures / (totalFailures + numCopied)) 
                     < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
                
                // check if the reducer has progressed enough
                boolean reducerProgressedEnough = 
                    (((float)numCopied / numMaps) 
                     >= MIN_REQUIRED_PROGRESS_PERCENT);
                
                // check if the reducer is stalled for a long time
                // duration for which the reducer is stalled
                int stallDuration = 
                    (int)(System.currentTimeMillis() - lastProgressTime);
                // duration for which the reducer ran with progress
                int shuffleProgressDuration = 
                    (int)(lastProgressTime - startTime);
                // min time the reducer should run without getting killed
                int minShuffleRunDuration = 
                    (shuffleProgressDuration > maxMapRuntime) 
                    ? shuffleProgressDuration 
                    : maxMapRuntime;
                boolean reducerStalled = 
                    (((float)stallDuration / minShuffleRunDuration) 
                     >= MAX_ALLOWED_STALL_TIME_PERCENT);
                
                // kill if not healthy and has insufficient progress
                if ((fetchFailedMaps.size() >= maxFailedUniqueFetches ||
                     fetchFailedMaps.size() == (numMaps - copiedMapOutputs.size()))
                    && !reducerHealthy 
                    && (!reducerProgressedEnough || reducerStalled)) { 
                  LOG.fatal("Shuffle failed with too many fetch failures " + 
                            "and insufficient progress!" +
                            "Killing task " + getTaskID() + ".");
                  umbilical.shuffleError(getTaskID(), 
                                         "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
                                         + " bailing-out.", jvmContext);
                }

              }
                
              currentTime = System.currentTimeMillis();
              long currentBackOff = (long)(INITIAL_PENALTY *
                  Math.pow(PENALTY_GROWTH_RATE, noFailedFetches));

              penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
              LOG.warn(reduceTask.getTaskID() + " adding host " +
                       cr.getHost() + " to penalty box, next contact in " +
                       (currentBackOff/1000) + " seconds");
            }
            uniqueHosts.remove(cr.getHost());
            numInFlight--;
          }
        }
        
        // all done, inform the copiers to exit
        exitGetMapEvents= true;
        try {
          getMapEventsThread.join();
          LOG.info("getMapsEventsThread joined.");
        } catch (InterruptedException ie) {
          LOG.info("getMapsEventsThread threw an exception: " +
              StringUtils.stringifyException(ie));
        }

        synchronized (copiers) {
          synchronized (scheduledCopies) {
            for (MapOutputCopier copier : copiers) {
              copier.interrupt();
            }
            copiers.clear();
          }
        }
        
        // copiers are done, exit and notify the waiting merge threads
        synchronized (mapOutputFilesOnDisk) {
          exitLocalFSMerge = true;
          mapOutputFilesOnDisk.notify();
        }
        
        ramManager.close();
        
        //Do a merge of in-memory files (if there are any)
        if (mergeThrowable == null) {
          try {
            // Wait for the on-disk merge to complete
            localFSMergerThread.join();
            LOG.info("Interleaved on-disk merge complete: " + 
                     mapOutputFilesOnDisk.size() + " files left.");
            
            //wait for an ongoing merge (if it is in flight) to complete
            inMemFSMergeThread.join();
            LOG.info("In-memory merge complete: " + 
                     mapOutputsFilesInMemory.size() + " files left.");
            } catch (InterruptedException ie) {
            LOG.warn(reduceTask.getTaskID() +
                     " Final merge of the inmemory files threw an exception: " + 
                     StringUtils.stringifyException(ie));
            // check if the last merge generated an error
            if (mergeThrowable != null) {
              mergeThrowable = ie;
            }
            return false;
          }
        }
        return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
    }

 

reducer读数据和map一样是用iterator的,不过不同的是,ruducer边排序边读数据。

 

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewReducer(JobConf job,
                     final TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator<INKEY> comparator,
                     Class<INKEY> keyClass,
                     Class<INVALUE> valueClass
                     ) throws IOException,InterruptedException, 
                              ClassNotFoundException {
    // wrap value iterator to report progress.
    final RawKeyValueIterator rawIter = rIter;
    rIter = new RawKeyValueIterator() {
      public void close() throws IOException {
        rawIter.close();
      }
      public DataInputBuffer getKey() throws IOException {
        return rawIter.getKey();
      }
      public Progress getProgress() {
        return rawIter.getProgress();
      }
      public DataInputBuffer getValue() throws IOException {
        return rawIter.getValue();
      }
      public boolean next() throws IOException {
        boolean ret = rawIter.next();
        reducePhase.set(rawIter.getProgress().get());
        reporter.progress();
        return ret;
      }
    };
    // make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
    // make a reducer
    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
      (org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
        outputFormat.getRecordWriter(taskContext);
     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
       new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduceOutputCounter);
    job.setBoolean("mapred.skip.on", isSkipping());
    org.apache.hadoop.mapreduce.Reducer.Context 
         reducerContext = createReduceContext(reducer, job, getTaskID(),
                                               rIter, reduceInputKeyCounter,
                                               reduceInputValueCounter, 
                                               trackedRW, committer,
                                               reporter, comparator, keyClass,
                                               valueClass);
    reducer.run(reducerContext);
    output.close(reducerContext);
  }
 
createKVIterator主要做了merger,方式是:归并+小顶堆的方式,返回一个iterator,每次next的时候取一个小顶堆的最小值。其中数据被包装的segment里面。

 

private RawKeyValueIterator createKVIterator(
        JobConf job, FileSystem fs, Reporter reporter) throws IOException {

      // merge config params
      Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
      Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
      boolean keepInputs = job.getKeepFailedTaskFiles();
      final Path tmpDir = new Path(getTaskID().toString());
      final RawComparator<K> comparator =
        (RawComparator<K>)job.getOutputKeyComparator();

      // segments required to vacate memory
      List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
      long inMemToDiskBytes = 0;
      if (mapOutputsFilesInMemory.size() > 0) {
        TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
        inMemToDiskBytes = createInMemorySegments(memDiskSegments,
            maxInMemReduce);
        final int numMemDiskSegments = memDiskSegments.size();
        if (numMemDiskSegments > 0 &&
              ioSortFactor > mapOutputFilesOnDisk.size()) {
          // must spill to disk, but can't retain in-mem for intermediate merge
          final Path outputPath =
              mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
          final RawKeyValueIterator rIter = Merger.merge(job, fs,
              keyClass, valueClass, memDiskSegments, numMemDiskSegments,
              tmpDir, comparator, reporter, spilledRecordsCounter, null);
          final Writer writer = new Writer(job, fs, outputPath,
              keyClass, valueClass, codec, null);
          try {
            Merger.writeFile(rIter, writer, reporter, job);
            addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
          } catch (Exception e) {
            if (null != outputPath) {
              fs.delete(outputPath, true);
            }
            throw new IOException("Final merge failed", e);
          } finally {
            if (null != writer) {
              writer.close();
            }
          }
          LOG.info("Merged " + numMemDiskSegments + " segments, " +
                   inMemToDiskBytes + " bytes to disk to satisfy " +
                   "reduce memory limit");
          inMemToDiskBytes = 0;
          memDiskSegments.clear();
        } else if (inMemToDiskBytes != 0) {
          LOG.info("Keeping " + numMemDiskSegments + " segments, " +
                   inMemToDiskBytes + " bytes in memory for " +
                   "intermediate, on-disk merge");
        }
      }

      // segments on disk
      List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
      long onDiskBytes = inMemToDiskBytes;
      Path[] onDisk = getMapFiles(fs, false);
      for (Path file : onDisk) {
        onDiskBytes += fs.getFileStatus(file).getLen();
        diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs));
      }
      LOG.info("Merging " + onDisk.length + " files, " +
               onDiskBytes + " bytes from disk");
      Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
        public int compare(Segment<K, V> o1, Segment<K, V> o2) {
          if (o1.getLength() == o2.getLength()) {
            return 0;
          }
          return o1.getLength() < o2.getLength() ? -1 : 1;
        }
      });

      // build final list of segments from merged backed by disk + in-mem
      List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
      long inMemBytes = createInMemorySegments(finalSegments, 0);
      LOG.info("Merging " + finalSegments.size() + " segments, " +
               inMemBytes + " bytes from memory into reduce");
      if (0 != onDiskBytes) {
        final int numInMemSegments = memDiskSegments.size();
        diskSegments.addAll(0, memDiskSegments);
        memDiskSegments.clear();
        RawKeyValueIterator diskMerge = Merger.merge(
            job, fs, keyClass, valueClass, codec, diskSegments,
            ioSortFactor, numInMemSegments, tmpDir, comparator,
            reporter, false, spilledRecordsCounter, null);
        diskSegments.clear();
        if (0 == finalSegments.size()) {
          return diskMerge;
        }
        finalSegments.add(new Segment<K,V>(
              new RawKVIteratorReader(diskMerge, onDiskBytes), true));
      }
      return Merger.merge(job, fs, keyClass, valueClass,
                   finalSegments, finalSegments.size(), tmpDir,
                   comparator, reporter, spilledRecordsCounter, null);
    }
  

 

分享到:
评论

相关推荐

    行业分类-设备装置-Hadoop大数据平台中基于网络IO代价评估的ReduceTask数据本地性调度方法.zip

    在Hadoop大数据平台中,数据本地性(Data Locality)是一个关键的概念,它涉及到如何高效地执行ReduceTask,以最大化集群资源的利用并减少网络IO的开销。本主题主要探讨了基于网络IO代价评估的ReduceTask数据本地性...

    远程调用执行Hadoop Map/Reduce

    例如,`org.apache.hadoop.mapred.MapTask`和`org.apache.hadoop.mapreduce.ReduceTask`分别对应Map和Reduce任务的实现,开发者可以通过阅读这些源码了解任务执行的详细流程。 7. **工具集成**:有许多开源工具可以...

    Hadoop运行流程详解

    - 输出收集:MapTask完成后,中间结果通过Shuffle和Sort阶段传递给ReduceTask,ReduceTask调用Reducer生成最终输出。 整个MapReduce过程是高度协调和分布式的,JobTracker、TaskTracker和Task间的通信以及数据传输...

    Hadoop大数据期末考试重点

    20. **ReduceTask的工作流程**:ReduceTask包含Copy、Merge、Sort、Reduce和Write五个阶段。 21. **Mapper类**:Hadoop提供的Mapper类是实现Map阶段逻辑的基础类。 以上是对Hadoop大数据期末考试重点内容的详细...

    Hadoop云计算2.0笔记第一课Hadoop介绍

    TaskTracker 是任务跟踪器,负责运行 Map Task 和 Reduce Task,与 JobTracker 交互,执行命令,并汇报任务状态。 6. Map 和 Reduce 任务: Map 任务负责解析每条数据记录,传递给用户编写的 map(),将 map() 输出...

    hadoop课后题带答案

    9. MapReduce工作流程:包括分片、格式化数据源、MapTask执行、Shuffle过程、ReduceTask执行和写入文件等步骤。 10. Partitioner:Partitioner的作用是将key均匀分布到不同的ReduceTask上,以优化并行计算。 11. ...

    Hadoop源代码分析(Task的内部类和辅助类)

    在Hadoop框架中,`Task`类是处理数据的核心组件之一,它包括`MapTask`和`ReduceTask`两种类型,分别负责数据的映射处理和归约处理。本文将深入剖析`Task`类中的内部类及其辅助类,旨在理解这些类如何协同工作以支持...

    17、MapReduce的分区Partition介绍

    在 Hadoop MapReduce 中,可以通过 Job 类的 `setNumReduceTasks(int num)` 方法来设置 Reduce Task 的数量,以适应不同的分区需求。调整 Reduce Task 的数量可以优化系统资源的利用,提高并行处理能力,同时也可以...

    Hadoop技术-MapReduce工作原理.pptx

    4. **数据收集**:map函数的输出通过context.write收集,使用默认的HashPartitioner进行分区,确保键值对被发送到正确的ReduceTask。 5. **内存缓冲与溢写**:数据写入环形内存缓冲区,当达到溢写条件(如默认的80%...

    hadoop 源码解析_yarn源码解析

    MR 程序执行过程中,会生成多个 Task 任务,包括 MapTask 和 ReduceTask。Task 任务会被分配到不同的 NodeManager 节点上执行。 8. MapTask MapTask 是 MR 程序的映射阶段,负责将输入数据映射到键值对。 9. ...

    Hadoop从入门到上手企业开发

    近百节课视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,...064 源代码跟踪查看Map Task和Reduce Task数目的个数 065 回顾MapReduce执行过程以及MapReduce核心 066 Hadoop MapReduce框架数据类型讲解 067

    Hadoop源码分析视频下载

    - 源码解析:深入Hadoop源码,研究如NameNode、DataNode、MapTask和ReduceTask等关键类的功能实现。 - 故障恢复和容错机制:探讨Hadoop如何处理硬件故障,保持数据完整性。 - 性能调优:学习如何通过调整参数和...

    Hadoop源代码分析(完整版).pdf

    * Reduce-task:负责将中间结果进行合并和处理,生成最终结果。 MapReduce 的主要特点包括: * 高可扩展:MapReduce 可以水平扩展,添加更多的计算节点以满足不断增长的数据需求。 * 高性能:MapReduce 可以并行...

    hadoop-2.10.0-src.tar.gz

    源码中,MapTask和ReduceTask的执行流程值得深入分析。 四、源码学习价值 阅读Hadoop 2.10.0的源码,可以帮助我们: 1. 理解Hadoop的内部工作机制,提升问题排查能力。 2. 学习分布式系统的设计与实现,为自定义...

    与 Hadoop 对比,如何看待 Spark 技术? - 知乎1

    处理逻辑隐藏在代码细节中,没有整体逻辑中间结果也放在 HDFS 文件系统中 ReduceTask 需要等待所有 MapTask 都完成后才可以开始时延高,只适用 Batch 数据处理,对于交互式数据处理,实时数据处理的支持不够。...

    Hadoop源码分析

    数据经过网络传输后,ReduceTask会根据分区规则将数据排序,然后Reducer处理这些数据生成最终结果。 此外,Hadoop的容错机制也是源码分析的重点。例如,如果某个TaskTracker宕机,JobTracker会重新调度其上的任务到...

    Hadoop源码分析完整版

    Task分为Map Task和Reduce Task,分别处理输入数据的映射阶段和数据聚合阶段。通过源码,我们可以深入了解任务分配策略、数据本地化优化、容错机制等关键功能。 Hadoop源码分析还包括对YARN(Yet Another Resource ...

    HadoopAPI使用

    TaskTracker 负责执行每一个任务,包括 MapTask 和 ReduceTask。JobClient 是一个客户端 API,用于将应用程序和配置参数打包成 jar 文件,并将其提交到 JobTracker 服务中。 JobInProgress 是 JobTracker 创建的一...

Global site tag (gtag.js) - Google Analytics