`
flyingdutchman
  • 浏览: 353270 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Hadoop深入学习:Mapper组件详解

阅读更多
        本节我们主要学习MapReduce编程接口模型中的Mapper组件,主要是学习其中一些的方法,如setup()、map()和cleanup()等方法地使用。
        我们先来看一下新版本中的Mapper代码:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

  public class Context 
    extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    public Context(Configuration conf, TaskAttemptID taskid,
                   RecordReader<KEYIN,VALUEIN> reader,
                   RecordWriter<KEYOUT,VALUEOUT> writer,
                   OutputCommitter committer,
                   StatusReporter reporter,
                   InputSplit split) throws IOException, InterruptedException {
      super(conf, taskid, reader, writer, committer, reporter, split);
    }
  }
  
  /**
   * 在Map Task任务开始执行的时候首先会调用该方法,只执行一次
   * 主要用于全局变量或重量级的操作的初始化,如集成HBase的时候,生成HTablePool
   * 如pool = new HTablePool();
   * 开发者一般可以不override该方法
   * Called once at the beginning of the task.
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * 开发者在该方法中来处理自己需要关注业务逻辑
   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);//context.write()执行后开始map断的shuffle处理过程。    
  }

  /**
   * Called once at the end of the task.
   * 在Map Task任务执行结束的时候调用该方法,且只执行一次
   * 该方法用于释放在setup()中初始化的一些重量级的资源
   * 一般情况下,开发者可以不用override该方法
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  
  /**
   * 该方法由框架调用,对于初级的Hadoop开发者而言,可以不需要修改该方法,但是对于
   * 资深的Hadoop开发者来说,可以重写该方法以达到完全精确控制整个Mapper的处理流程 
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    //Map Task执行流程的第一步
    setup(context);
    //第二步,循环调用map()方法来专注于开发者的业务逻辑处理
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    //第三步,清除Task的上下文信息或释放全局的重量级的资源
    cleanup(context);
  }
}
        

        我们再来看一看Mapper的处理流程:

       
        在整个Map Task的处理流程中,由几点需要特别注意:
        1)、Map处理的中间结果会以临时数据文件方式被保存在linux的本地文件系统上,而非HDFS文件系统上。
        2)、Map Task处理过的数据会溢写超过内存缓冲区阀值的数据,经排序、spill、和合并操作,经所有的临时的中间数据文件合并成一个大文件和一个索引文件,具体过程详见MapTask详解
  • 大小: 39.7 KB
0
5
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics