`
tcxiang
  • 浏览: 87965 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

hadoop maptask

 
阅读更多

hadoop MapTask

1.通过Job的inputFormmat获得对应InputFormat然后获得RecordReader

2.numReduceTasks从前面conf计算的得到,numReduceTasks>0就有n个partition来做shuffle,说明partition的个数是由reduceNum决定的。numReduceTasks为0,则明显是map直接输出的任务。 

 

 private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runOldMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, InterruptedException,
                             ClassNotFoundException {
    InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()),
           splitIndex.getStartOffset());

    updateJobWithSplit(job, inputSplit);
    reporter.setInputSplit(inputSplit);

    RecordReader<INKEY,INVALUE> rawIn =                  // open input
      job.getInputFormat().getRecordReader(inputSplit, job, reporter);
    RecordReader<INKEY,INVALUE> in = isSkipping() ? 
        new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
        new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter);
    job.setBoolean("mapred.skip.on", isSkipping());


    int numReduceTasks = conf.getNumReduceTasks();
    LOG.info("numReduceTasks: " + numReduceTasks);
    MapOutputCollector collector = null;
    if (numReduceTasks > 0) {
      collector = new MapOutputBuffer(umbilical, job, reporter);
    } else { 
      collector = new DirectMapOutputCollector(umbilical, job, reporter);
    }
    MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
      ReflectionUtils.newInstance(job.getMapRunnerClass(), job);

    try {
      runner.run(in, new OldOutputCollector(collector, conf), reporter);
      collector.flush();
    } finally {
      //close
      in.close();                               // close input
      collector.close();
    }
  }

 

Q.前面方法调用getSplitDetail是为了获得InputSplit,这里有点看不懂

private <T> T getSplitDetails(Path file, long offset)
   throws IOException {
    FileSystem fs = file.getFileSystem(conf);
    FSDataInputStream inFile = fs.open(file);
    inFile.seek(offset);
    String className = Text.readString(inFile);
    Class<T> cls;
    try {
      cls = (Class<T>) conf.getClassByName(className);
    } catch (ClassNotFoundException ce) {
      IOException wrap = new IOException("Split class " + className +
                                          " not found");
      wrap.initCause(ce);
      throw wrap;
    }
    SerializationFactory factory = new SerializationFactory(conf);
    Deserializer<T> deserializer = (Deserializer<T>) factory.getDeserializer(cls);
    deserializer.open(inFile);
    T split = deserializer.deserialize(null);
    long pos = inFile.getPos();
    getCounters().findCounter(Task.Counter.SPLIT_RAW_BYTES).increment(pos - offset);
    inFile.close();
    return split;
  }

 

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) 
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    // start thread that will handle communication with parent
    TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
        jvmContext);
    reporter.startCommunicationThread();
    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

    if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }

 

新api下的runMapper,将各种自定义的class信息都保存到conf里了,用动态代理的方式new mapper出来。

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    // make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
    // make a mapper
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // make the input format
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // rebuild the input split
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());

    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
          (inputFormat.createRecordReader(split, taskContext), reporter);
    
    job.setBoolean("mapred.skip.on", isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
         mapperContext = null;
    try {
      Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
        org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
        (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
                     Configuration.class,
                     org.apache.hadoop.mapreduce.TaskAttemptID.class,
                     org.apache.hadoop.mapreduce.RecordReader.class,
                     org.apache.hadoop.mapreduce.RecordWriter.class,
                     org.apache.hadoop.mapreduce.OutputCommitter.class,
                     org.apache.hadoop.mapreduce.StatusReporter.class,
                     org.apache.hadoop.mapreduce.InputSplit.class});

      // get an output object
      if (job.getNumReduceTasks() == 0) {
         output =
           new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
      } else {
        output = new NewOutputCollector(taskContext, job, umbilical, reporter);
      }

      mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
                                                     input, output, committer,
                                                     reporter, split);

      input.initialize(split, mapperContext);
      mapper.run(mapperContext);
      input.close();
      output.close(mapperContext);
    } catch (NoSuchMethodException e) {
      throw new IOException("Can't find Context constructor", e);
    } catch (InstantiationException e) {
      throw new IOException("Can't create Context", e);
    } catch (InvocationTargetException e) {
      throw new IOException("Can't invoke Context constructor", e);
    } catch (IllegalAccessException e) {
      throw new IOException("Can't invoke Context constructor", e);
    }
  }

 

分享到:
评论

相关推荐

    Hadoop源代码分析(MapTask辅助类 I)

    ### Hadoop MapTask辅助类源代码分析 #### 一、概述 Hadoop作为一个分布式计算框架,其核心组件之一是MapReduce。MapReduce负责处理大规模数据集的并行运算任务,而MapTask作为MapReduce的核心组成部分之一,其...

    Hadoop源代码分析(MapTask)

    Hadoop源代码分析(MapTask) Hadoop的MapTask类是Hadoop MapReduce框架中的一部分,负责执行Map任务。MapTask类继承自Task类,是MapReduce框架中的一个重要组件。本文将对MapTask类的源代码进行分析,了解其内部...

    Hadoop源代码分析(MapTask辅助类,III)

    ### Hadoop源代码分析——MapTask辅助类输出机制详解 #### 概述 本文将深入探讨Hadoop MapReduce框架中的MapTask辅助类中与键值对(Key-Value,简称KV)输出相关的源代码实现细节。这部分内容对于理解Hadoop内部...

    MapTask工作机制图文详解

    MapTask工作机制是Hadoop MapReduce框架中的一个关键组件,负责将输入数据处理并输出结果。本文将详细介绍MapTask工作机制的五个阶段:Read阶段、Map阶段、Collect收集阶段、Spill阶段和Combine阶段。 Read阶段 在...

    Hadoop源代码分析(MapTask辅助类,II)

    在Hadoop中,MapTask是MapReduce框架的关键组件,负责执行Mapper阶段的工作。MapTask辅助类,特别是MapOutputBuffer,是Mapper输出数据管理的核心部分。本文将继续深入分析MapOutputBuffer的内部实现,以便理解...

    远程调用执行Hadoop Map/Reduce

    例如,`org.apache.hadoop.mapred.MapTask`和`org.apache.hadoop.mapreduce.ReduceTask`分别对应Map和Reduce任务的实现,开发者可以通过阅读这些源码了解任务执行的详细流程。 7. **工具集成**:有许多开源工具可以...

    Hadoop运行流程详解

    - MapTask和ReduceTask:Mapper和Combiner(如有配置)由MapTask调用,Reducer由ReduceTask调用。Mapper读取输入数据,经过处理后生成中间结果,若配置了Combiner,则在Mapper本地进行预聚合。ReduceTask接收Map...

    Hadoop云计算2.0笔记第一课Hadoop介绍

    TaskTracker 是任务跟踪器,负责运行 Map Task 和 Reduce Task,与 JobTracker 交互,执行命令,并汇报任务状态。 6. Map 和 Reduce 任务: Map 任务负责解析每条数据记录,传递给用户编写的 map(),将 map() 输出...

    提高hadoop的mapreduce job效率笔记

    当某些 Task 运行缓慢时,Hadoop 可以启动额外的 Task 实例来尝试替换它们。这有助于减少整体作业时间,但也可能浪费资源。因此,合理配置推测执行阈值是必要的。 8. **Shuffle阶段优化**: 优化 Shuffle 阶段的...

    Hadoop源代码分析(Task的内部类和辅助类)

    在Hadoop框架中,`Task`类是处理数据的核心组件之一,它包括`MapTask`和`ReduceTask`两种类型,分别负责数据的映射处理和归约处理。本文将深入剖析`Task`类中的内部类及其辅助类,旨在理解这些类如何协同工作以支持...

    hadoop作业调优参数整理及原理

    1. **MapTask运行内部原理** - **MapOutputBuffer**:每个MapTask都有一个内存缓冲区,用于暂存计算结果。默认大小为100MB,可使用`io.sort.mb`参数调整。 - **Sort与Spill**:当缓冲区达到80%满时(默认阈值,由`...

    Hadoop大数据期末考试重点

    21. **Mapper类**:Hadoop提供的Mapper类是实现Map阶段逻辑的基础类。 以上是对Hadoop大数据期末考试重点内容的详细解读,涵盖了Hadoop的分布式文件系统HDFS、MapReduce计算模型以及相关配置和操作细节,考生需要对...

    Hadoop技术-MapReduce工作原理.pptx

    **MapTask并行度**:MapTask的数量由InputFormat切片机制决定,直接影响作业的并行度和效率。适当的切片大小能最大化并行度,避免过多的磁盘I/O。 **ReduceTask运行流程:** 1. **数据拷贝**:Reduce进程启动数据...

    Hadoop从入门到上手企业开发

    近百节课视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,...064 源代码跟踪查看Map Task和Reduce Task数目的个数 065 回顾MapReduce执行过程以及MapReduce核心 066 Hadoop MapReduce框架数据类型讲解 067

    hadoop课后题带答案

    9. MapReduce工作流程:包括分片、格式化数据源、MapTask执行、Shuffle过程、ReduceTask执行和写入文件等步骤。 10. Partitioner:Partitioner的作用是将key均匀分布到不同的ReduceTask上,以优化并行计算。 11. ...

    Hadoop源码分析视频下载

    - 源码解析:深入Hadoop源码,研究如NameNode、DataNode、MapTask和ReduceTask等关键类的功能实现。 - 故障恢复和容错机制:探讨Hadoop如何处理硬件故障,保持数据完整性。 - 性能调优:学习如何通过调整参数和...

    hadoop 源码解析_yarn源码解析

    MR 程序执行过程中,会生成多个 Task 任务,包括 MapTask 和 ReduceTask。Task 任务会被分配到不同的 NodeManager 节点上执行。 8. MapTask MapTask 是 MR 程序的映射阶段,负责将输入数据映射到键值对。 9. ...

    hadoop-2.10.0-src.tar.gz

    源码中,MapTask和ReduceTask的执行流程值得深入分析。 四、源码学习价值 阅读Hadoop 2.10.0的源码,可以帮助我们: 1. 理解Hadoop的内部工作机制,提升问题排查能力。 2. 学习分布式系统的设计与实现,为自定义...

Global site tag (gtag.js) - Google Analytics