`
heipark
  • 浏览: 2082523 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hadoop map task中Partitioner执行时机

阅读更多

 

MapTask.runNewMapper() ->

  ...

  if (job.getNumReduceTasks() == 0) {

      // 无reduce,直接写HDFS

      // 这个writer输出时不执行Partitioner.getPartition()方法

     output =  new NewDirectOutputCollector(taskContext, job, umbilical, reporter);

  } else {

       // 执行Reduce函数,写入本地文件

      // 初始化Partitioner
      output = new NewOutputCollector(taskContext, job, umbilical, reporter); 
  }

  // 将output传入mapperContext

  mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
                                                     input, output, committer,
                                                     reporter, split);

 

  mapper.run(mapperContext); <--

 

mapper.run(mapperContext) ->

  while (context.nextKeyValue()) {

      map(context.getCurrentKey(), context.getCurrentValue(), context); <-
    }

 

//map()函数由用户实现,并调用context.write()方法输出
map() ->

  context.write((KEYOUT) key, (VALUEOUT) value); <-

 

write()

  // 实际调用NewOutputCollector.wirte()

  // 先计算key-value的partition,然后执行collect输出数据到内存缓冲区

  collector.collect(key, value, partitioner.getPartition(key, value, partitions));

 

结论:

  1. Partitioner是在map函数执行context.write()时被调用。
  2. 如果没有Reduce函数,则MapTask不会执行Partitioner.getPartition()方法。

 

 --end

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics