`
flyingdutchman
  • 浏览: 353283 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Hadoop深入学习:Reduce组件详解

阅读更多
        在本节中我们主要来学习MapReduce编程接口模型中的Reduce组件。
        和学习Mapper组件详解一样,我们先来看看1.0新版本中Reduce代码:
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {

  public class Context 
    extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    public Context(Configuration conf, TaskAttemptID taskid,
                   RawKeyValueIterator input, 
                   Counter inputKeyCounter,
                   Counter inputValueCounter,
                   RecordWriter<KEYOUT,VALUEOUT> output,
                   OutputCommitter committer,
                   StatusReporter reporter,
                   RawComparator<KEYIN> comparator,
                   Class<KEYIN> keyClass,
                   Class<VALUEIN> valueClass
                   ) throws IOException, InterruptedException {
      super(conf, taskid, input, inputKeyCounter, inputValueCounter,
            output, committer, reporter, 
            comparator, keyClass, valueClass);
    }
  }

  /**
   * Called once at the start of the task.
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * This method is called once for each key. Most applications will define
   * their reduce class by overriding this method. The default implementation
   * is an identity function.
   */
  @SuppressWarnings("unchecked")
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * Advanced application writers can use the 
   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
   * control how the reduce task works.
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKey()) {
      reduce(context.getCurrentKey(), context.getValues(), context);
    }
    cleanup(context);
  }
}

          Reduce中的方法和Mapper中的方法基本一样,使用也擦不多。
          对于MapReduce编程模型中的Mapper和Reduce两个组件来说,分别对应Map Task和Reduce Task。在一个MapReduce,用户可以控制Reduce Task作业的个数,也就是说,用户可以让一个作业中的reducer的个数为零,但是却不能手工的决定Map Task的任务数目,而只能同配置参数(详情见InputFormat组件)来间接控制mapper的个数。

         但是在真正的执行Reduce组件中的reduce()方法来处理用户关注的业务逻辑之间,会先执行reducer端的shuffle阶段,其流程图如下所示:

  • 大小: 47.4 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics