`
tcxiang
  • 浏览: 85628 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

hadoop maptask

 
阅读更多

hadoop MapTask

1.通过Job的inputFormmat获得对应InputFormat然后获得RecordReader

2.numReduceTasks从前面conf计算的得到,numReduceTasks>0就有n个partition来做shuffle,说明partition的个数是由reduceNum决定的。numReduceTasks为0,则明显是map直接输出的任务。 

 

 private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runOldMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, InterruptedException,
                             ClassNotFoundException {
    InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()),
           splitIndex.getStartOffset());

    updateJobWithSplit(job, inputSplit);
    reporter.setInputSplit(inputSplit);

    RecordReader<INKEY,INVALUE> rawIn =                  // open input
      job.getInputFormat().getRecordReader(inputSplit, job, reporter);
    RecordReader<INKEY,INVALUE> in = isSkipping() ? 
        new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
        new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter);
    job.setBoolean("mapred.skip.on", isSkipping());


    int numReduceTasks = conf.getNumReduceTasks();
    LOG.info("numReduceTasks: " + numReduceTasks);
    MapOutputCollector collector = null;
    if (numReduceTasks > 0) {
      collector = new MapOutputBuffer(umbilical, job, reporter);
    } else { 
      collector = new DirectMapOutputCollector(umbilical, job, reporter);
    }
    MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
      ReflectionUtils.newInstance(job.getMapRunnerClass(), job);

    try {
      runner.run(in, new OldOutputCollector(collector, conf), reporter);
      collector.flush();
    } finally {
      //close
      in.close();                               // close input
      collector.close();
    }
  }

 

Q.前面方法调用getSplitDetail是为了获得InputSplit,这里有点看不懂

private <T> T getSplitDetails(Path file, long offset)
   throws IOException {
    FileSystem fs = file.getFileSystem(conf);
    FSDataInputStream inFile = fs.open(file);
    inFile.seek(offset);
    String className = Text.readString(inFile);
    Class<T> cls;
    try {
      cls = (Class<T>) conf.getClassByName(className);
    } catch (ClassNotFoundException ce) {
      IOException wrap = new IOException("Split class " + className +
                                          " not found");
      wrap.initCause(ce);
      throw wrap;
    }
    SerializationFactory factory = new SerializationFactory(conf);
    Deserializer<T> deserializer = (Deserializer<T>) factory.getDeserializer(cls);
    deserializer.open(inFile);
    T split = deserializer.deserialize(null);
    long pos = inFile.getPos();
    getCounters().findCounter(Task.Counter.SPLIT_RAW_BYTES).increment(pos - offset);
    inFile.close();
    return split;
  }

 

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) 
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    // start thread that will handle communication with parent
    TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
        jvmContext);
    reporter.startCommunicationThread();
    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

    if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }

 

新api下的runMapper,将各种自定义的class信息都保存到conf里了,用动态代理的方式new mapper出来。

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    // make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
    // make a mapper
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // make the input format
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // rebuild the input split
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());

    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
          (inputFormat.createRecordReader(split, taskContext), reporter);
    
    job.setBoolean("mapred.skip.on", isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
         mapperContext = null;
    try {
      Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
        org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
        (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
                     Configuration.class,
                     org.apache.hadoop.mapreduce.TaskAttemptID.class,
                     org.apache.hadoop.mapreduce.RecordReader.class,
                     org.apache.hadoop.mapreduce.RecordWriter.class,
                     org.apache.hadoop.mapreduce.OutputCommitter.class,
                     org.apache.hadoop.mapreduce.StatusReporter.class,
                     org.apache.hadoop.mapreduce.InputSplit.class});

      // get an output object
      if (job.getNumReduceTasks() == 0) {
         output =
           new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
      } else {
        output = new NewOutputCollector(taskContext, job, umbilical, reporter);
      }

      mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
                                                     input, output, committer,
                                                     reporter, split);

      input.initialize(split, mapperContext);
      mapper.run(mapperContext);
      input.close();
      output.close(mapperContext);
    } catch (NoSuchMethodException e) {
      throw new IOException("Can't find Context constructor", e);
    } catch (InstantiationException e) {
      throw new IOException("Can't create Context", e);
    } catch (InvocationTargetException e) {
      throw new IOException("Can't invoke Context constructor", e);
    } catch (IllegalAccessException e) {
      throw new IOException("Can't invoke Context constructor", e);
    }
  }

 

分享到:
评论

相关推荐

    Hadoop源代码分析(MapTask)

    Hadoop的MapTask类源代码分析

    Hadoop源代码分析(MapTask辅助类 I)

    Hadoop的MapTask辅助类源代码分析(I)

    Hadoop源代码分析(MapTask辅助类,III)

    MapTask辅助类源代码分析III\ Hadoop

    Hadoop源代码分析(MapTask辅助类,II)

    Hadoop中的MapTask辅助类的源代码分析(II)

    MapTask工作机制图文详解

    今天小编就为大家分享一篇关于MapTask工作机制图文详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧

    Hadoop从入门到上手企业开发

    近百节课视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,...064 源代码跟踪查看Map Task和Reduce Task数目的个数 065 回顾MapReduce执行过程以及MapReduce核心 066 Hadoop MapReduce框架数据类型讲解 067

    hadoop 1.2.1 api 最新chm 伪中文版

    一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序, 然后把结果输入给reduce任务。通常作业的输入和输出都会被...

    最新Hadoop的面试题总结

    (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。 (2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台...

    java大数据作业_1云计算、大数据、hadoop

    课后作业 1.SAAS、PAAS、IAAS、XAAS都是什么意思?另外猜猜DAAS是什么意思? 2.大数据的4个特点是什么? 3.虚拟机与主机构成闭环局域网...Map Task Capacity Reduce Task Capacity Queue Name 10.如何启动一个datanode

    hadoop 权威指南(第三版)英文版

    hadoop权威指南第三版(英文版)。 Foreword . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xiii Preface . . . . . . ....

    Optimizing Hadoop for MapReduce(PACKT,2014)

    MapReduce is the distribution ...Optimize mapper and reducer task throughput and code size using compression and Combiners Understand the various tuning properties and best practices to optimize clusters

    MapReduce详解包括配置文件

    需要注意的是,切片本身是一种逻辑切分而不是物理切分,本质上就是在划分任务量,之后每一个切片会交给一个单独的MapTask来进行处理。默认情况下,Split和Block的大小是一致的。 切片之后,每一个切片(Split)会分配...

    hadoop join implement

    Joins in Hadoop has always been a problem for its users: the Map/Reduce framework seems to be specifically designed for group-by aggregation tasks rather than across-table op- erations; on the other ...

    hadoop_the_definitive_guide_3nd_edition

    Hadoop definitive 第三版, 目录如下 1. Meet Hadoop . . . 1 Data! 1 Data Storage and Analysis 3 Comparison with Other Systems 4 RDBMS 4 Grid Computing 6 Volunteer Computing 8 A Brief History of Hadoop 9...

    22、MapReduce使用Gzip压缩、Snappy压缩和Lzo压缩算法写文件和读取相应的文件

    本文的前提是hadoop环境正常。 本文最好和MapReduce操作常见的文件文章一起阅读,因为写文件与压缩往往是结合在一起的。 相关压缩算法介绍参考文章:HDFS文件类型与压缩算法介绍。 本文介绍写文件时使用的压缩算法,...

    kafka-hadoop-loader-my:kafka0.8.2使用简单的消费者负载消息使用自定义mapreduce进入hdfs

    实际使用者及其内部提取程序线程都包装为KafkaInputContext,它是为每个Map Task的记录读取器对象创建的。 然后,映射器接收最不利的消息对,解析日期的内容并发出(date,message),然后由Output Format拾取并在...

    MapReduceV1:JobTracker端Job/Task数据结构

    在编写MapReduce程序时,我们是以Job为单位进行编程处理,一个应用程序可能由一组Job组成,而MapReduce框架给我们暴露的只是一些Map和Reduce的函数接口,在运行期它会构建对应MapTask和ReduceTask,所以我们知道一个...

    2018最新BAT大数据面试题.docx

    3)很多人的误解在 Map 阶段,如果不使用 Combiner便不会排序,这是错误的,不管你用不用 Combiner,Map Task 均会对产生的数据排序(如果没有 Reduce Task,则不会排序,实际上 Map 阶段的排序就是为了减轻 Reduce...

    拥抱大数据——初识Hadoop,轻松应对海量数据存储与分析所带来的挑战

    4.1 Map+Reduce 4.2 MapReduce架构 4.3 MapReduce数据处理 4.3.1 job与task 4.3.2 MapReduce数据处理 五、YARN(资源管理系统) 5.1 YARN架构 六、手把手搭建Hadoop环境(Linux上) 6.1 安装jdk 6.2 安装hadoop 6.3

Global site tag (gtag.js) - Google Analytics