`
jiwenke
  • 浏览: 396938 次
  • 性别: Icon_minigender_1
  • 来自: 南京
博客专栏
4ee69929-b8e1-3eb5-bbca-157d075d6192
Spring技术内幕——深...
浏览量:181775
D1c226f7-74e9-3ec4-a15b-18a45ccd88c5
随笔:Spring与云计算...
浏览量:25490
社区版块
存档分类
最新评论

Hadoop的mapred JobTracker端源码概览

阅读更多
上一节看到TaskTracker启动新任务的过程,这里接着看看在JobTracker中是怎样响应和调度的,在hadoop中,我们看到采用的是pull的方式拿到任务。
    
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, 
                                                            justStarted, askForNewTask, 
                                                              heartbeatResponseId);

这里是TaskTracker想JobTracker发送heartbeat的地方 - 使用的是RPC,这样我们你就来到JobTracker了:
  
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 
                                                  boolean initialContact, boolean acceptNewTasks, short responseId) 
    throws IOException {
.............
    //如果是接受新任务的话,让JotTracker去进行调度,这里会调用taskScheduler的assignTasks
    if (acceptNewTasks) {
      TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
      if (taskTrackerStatus == null) {
        LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
      } else {
        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
    //这里是准备assignTask的地方,由配置的调度器来决定怎样调度
        if (tasks == null ) {
         tasks = taskScheduler.assignTasks(taskTrackerStatus);
        }
        if (tasks != null) {
          for (Task task : tasks) {
            expireLaunchingTasks.addNewTask(task.getTaskID());
            LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
            actions.add(new LaunchTaskAction(task));
          }
        }
      }
    }

这个taskScheduler采用的是默认的   
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass,conf);

这是在配置文件中指定的,"mapred.jobtracker.taskScheduler",常常是JobQueueTaskScheduler是hadoop的实现,FIFO类型的调度器,让我们看看这个调度器是怎样assignTasks的:
  
public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
      throws IOException {

    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
    int numTaskTrackers = clusterStatus.getTaskTrackers();

    Collection<JobInProgress> jobQueue =
      jobQueueJobInProgressListener.getJobQueue();

    //
    // Get map + reduce counts for the current tracker.
    //
    int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
    int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
    int numMaps = taskTracker.countMapTasks();
    int numReduces = taskTracker.countReduceTasks();

    //
    // Compute average map and reduce task numbers across pool
    //
    int remainingReduceLoad = 0;
    int remainingMapLoad = 0;
    synchronized (jobQueue) {
      for (JobInProgress job : jobQueue) {
        if (job.getStatus().getRunState() == JobStatus.RUNNING) {
          int totalMapTasks = job.desiredMaps();
          int totalReduceTasks = job.desiredReduces();
          remainingMapLoad += (totalMapTasks - job.finishedMaps());
          remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
        }
      }
    }

    // find out the maximum number of maps or reduces that we are willing
    // to run on any node.
    int maxMapLoad = 0;
    int maxReduceLoad = 0;
    if (numTaskTrackers > 0) {
      maxMapLoad = Math.min(maxCurrentMapTasks,
                            (int) Math.ceil((double) remainingMapLoad / 
                                            numTaskTrackers));
      maxReduceLoad = Math.min(maxCurrentReduceTasks,
                               (int) Math.ceil((double) remainingReduceLoad
                                               / numTaskTrackers));
    }
        
    int totalMaps = clusterStatus.getMapTasks();
    int totalMapTaskCapacity = clusterStatus.getMaxMapTasks();
    int totalReduces = clusterStatus.getReduceTasks();
    int totalReduceTaskCapacity = clusterStatus.getMaxReduceTasks();

    //
    // In the below steps, we allocate first a map task (if appropriate),
    // and then a reduce task if appropriate.  We go through all jobs
    // in order of job arrival; jobs only get serviced if their 
    // predecessors are serviced, too.
    //

    //
    // We hand a task to the current taskTracker if the given machine 
    // has a workload that's less than the maximum load of that kind of
    // task.
    //
       
    if (numMaps < maxMapLoad) {

      int totalNeededMaps = 0;
      synchronized (jobQueue) {
        for (JobInProgress job : jobQueue) {
          if (job.getStatus().getRunState() != JobStatus.RUNNING) {
            continue;
          }
      //这里是取得Task的地方,需要到job中去取
          Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
              taskTrackerManager.getNumberOfUniqueHosts());
          if (t != null) {
            return Collections.singletonList(t);
          }

          //
          // Beyond the highest-priority task, reserve a little 
          // room for failures and speculative executions; don't 
          // schedule tasks to the hilt.
          //
          totalNeededMaps += job.desiredMaps();
          int padding = 0;
          if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
            padding = Math.min(maxCurrentMapTasks,
                               (int)(totalNeededMaps * padFraction));
          }
          if (totalMaps + padding >= totalMapTaskCapacity) {
            break;
          }
        }
      }
    }

    //
    // Same thing, but for reduce tasks
    //
    if (numReduces < maxReduceLoad) {

      int totalNeededReduces = 0;
      synchronized (jobQueue) {
        for (JobInProgress job : jobQueue) {
          if (job.getStatus().getRunState() != JobStatus.RUNNING ||
              job.numReduceTasks == 0) {
            continue;
          }

          Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
              taskTrackerManager.getNumberOfUniqueHosts());
          if (t != null) {
            return Collections.singletonList(t);
          }

          //
          // Beyond the highest-priority task, reserve a little 
          // room for failures and speculative executions; don't 
          // schedule tasks to the hilt.
          //
          totalNeededReduces += job.desiredReduces();
          int padding = 0;
          if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
            padding = 
              Math.min(maxCurrentReduceTasks,
                       (int) (totalNeededReduces * padFraction));
          }
          if (totalReduces + padding >= totalReduceTaskCapacity) {
            break;
          }
        }
      }
    }
    return null;
  }

task的取得就要到JobInProgress中去obtainNewReduceTask了,需要对集群的状态进行查询处理了。
4
0
分享到:
评论
1 楼 luweimstr 2011-09-01  
good job.学习了

相关推荐

Global site tag (gtag.js) - Google Analytics