hadoop MapTask
1.通过Job的inputFormmat获得对应InputFormat然后获得RecordReader
2.numReduceTasks从前面conf计算的得到,numReduceTasks>0就有n个partition来做shuffle,说明partition的个数是由reduceNum决定的。numReduceTasks为0,则明显是map直接输出的任务。
private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runOldMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, InterruptedException, ClassNotFoundException { InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); updateJobWithSplit(job, inputSplit); reporter.setInputSplit(inputSplit); RecordReader<INKEY,INVALUE> rawIn = // open input job.getInputFormat().getRecordReader(inputSplit, job, reporter); RecordReader<INKEY,INVALUE> in = isSkipping() ? new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) : new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter); job.setBoolean("mapred.skip.on", isSkipping()); int numReduceTasks = conf.getNumReduceTasks(); LOG.info("numReduceTasks: " + numReduceTasks); MapOutputCollector collector = null; if (numReduceTasks > 0) { collector = new MapOutputBuffer(umbilical, job, reporter); } else { collector = new DirectMapOutputCollector(umbilical, job, reporter); } MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job); try { runner.run(in, new OldOutputCollector(collector, conf), reporter); collector.flush(); } finally { //close in.close(); // close input collector.close(); } }
Q.前面方法调用getSplitDetail是为了获得InputSplit,这里有点看不懂
private <T> T getSplitDetails(Path file, long offset) throws IOException { FileSystem fs = file.getFileSystem(conf); FSDataInputStream inFile = fs.open(file); inFile.seek(offset); String className = Text.readString(inFile); Class<T> cls; try { cls = (Class<T>) conf.getClassByName(className); } catch (ClassNotFoundException ce) { IOException wrap = new IOException("Split class " + className + " not found"); wrap.initCause(ce); throw wrap; } SerializationFactory factory = new SerializationFactory(conf); Deserializer<T> deserializer = (Deserializer<T>) factory.getDeserializer(cls); deserializer.open(inFile); T split = deserializer.deserialize(null); long pos = inFile.getPos(); getCounters().findCounter(Task.Counter.SPLIT_RAW_BYTES).increment(pos - offset); inFile.close(); return split; }
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; // start thread that will handle communication with parent TaskReporter reporter = new TaskReporter(getProgress(), umbilical, jvmContext); reporter.startCommunicationThread(); boolean useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; } if (useNewApi) { runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter); }
新api下的runMapper,将各种自定义的class信息都保存到conf里了,用动态代理的方式new mapper出来。
private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException { // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID()); // make a mapper org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // make the input format org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // rebuild the input split org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (inputFormat.createRecordReader(split, taskContext), reporter); job.setBoolean("mapred.skip.on", isSkipping()); org.apache.hadoop.mapreduce.RecordWriter output = null; org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = null; try { Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor = org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor (new Class[]{org.apache.hadoop.mapreduce.Mapper.class, Configuration.class, org.apache.hadoop.mapreduce.TaskAttemptID.class, org.apache.hadoop.mapreduce.RecordReader.class, org.apache.hadoop.mapreduce.RecordWriter.class, org.apache.hadoop.mapreduce.OutputCommitter.class, org.apache.hadoop.mapreduce.StatusReporter.class, org.apache.hadoop.mapreduce.InputSplit.class}); // get an output object if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new NewOutputCollector(taskContext, job, umbilical, reporter); } mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(), input, output, committer, reporter, split); input.initialize(split, mapperContext); mapper.run(mapperContext); input.close(); output.close(mapperContext); } catch (NoSuchMethodException e) { throw new IOException("Can't find Context constructor", e); } catch (InstantiationException e) { throw new IOException("Can't create Context", e); } catch (InvocationTargetException e) { throw new IOException("Can't invoke Context constructor", e); } catch (IllegalAccessException e) { throw new IOException("Can't invoke Context constructor", e); } }
相关推荐
### Hadoop MapTask辅助类源代码分析 #### 一、概述 Hadoop作为一个分布式计算框架,其核心组件之一是MapReduce。MapReduce负责处理大规模数据集的并行运算任务,而MapTask作为MapReduce的核心组成部分之一,其...
Hadoop源代码分析(MapTask) Hadoop的MapTask类是Hadoop MapReduce框架中的一部分,负责执行Map任务。MapTask类继承自Task类,是MapReduce框架中的一个重要组件。本文将对MapTask类的源代码进行分析,了解其内部...
### Hadoop源代码分析——MapTask辅助类输出机制详解 #### 概述 本文将深入探讨Hadoop MapReduce框架中的MapTask辅助类中与键值对(Key-Value,简称KV)输出相关的源代码实现细节。这部分内容对于理解Hadoop内部...
MapTask工作机制是Hadoop MapReduce框架中的一个关键组件,负责将输入数据处理并输出结果。本文将详细介绍MapTask工作机制的五个阶段:Read阶段、Map阶段、Collect收集阶段、Spill阶段和Combine阶段。 Read阶段 在...
在Hadoop中,MapTask是MapReduce框架的关键组件,负责执行Mapper阶段的工作。MapTask辅助类,特别是MapOutputBuffer,是Mapper输出数据管理的核心部分。本文将继续深入分析MapOutputBuffer的内部实现,以便理解...
例如,`org.apache.hadoop.mapred.MapTask`和`org.apache.hadoop.mapreduce.ReduceTask`分别对应Map和Reduce任务的实现,开发者可以通过阅读这些源码了解任务执行的详细流程。 7. **工具集成**:有许多开源工具可以...
- MapTask和ReduceTask:Mapper和Combiner(如有配置)由MapTask调用,Reducer由ReduceTask调用。Mapper读取输入数据,经过处理后生成中间结果,若配置了Combiner,则在Mapper本地进行预聚合。ReduceTask接收Map...
TaskTracker 是任务跟踪器,负责运行 Map Task 和 Reduce Task,与 JobTracker 交互,执行命令,并汇报任务状态。 6. Map 和 Reduce 任务: Map 任务负责解析每条数据记录,传递给用户编写的 map(),将 map() 输出...
当某些 Task 运行缓慢时,Hadoop 可以启动额外的 Task 实例来尝试替换它们。这有助于减少整体作业时间,但也可能浪费资源。因此,合理配置推测执行阈值是必要的。 8. **Shuffle阶段优化**: 优化 Shuffle 阶段的...
在Hadoop框架中,`Task`类是处理数据的核心组件之一,它包括`MapTask`和`ReduceTask`两种类型,分别负责数据的映射处理和归约处理。本文将深入剖析`Task`类中的内部类及其辅助类,旨在理解这些类如何协同工作以支持...
1. **MapTask运行内部原理** - **MapOutputBuffer**:每个MapTask都有一个内存缓冲区,用于暂存计算结果。默认大小为100MB,可使用`io.sort.mb`参数调整。 - **Sort与Spill**:当缓冲区达到80%满时(默认阈值,由`...
21. **Mapper类**:Hadoop提供的Mapper类是实现Map阶段逻辑的基础类。 以上是对Hadoop大数据期末考试重点内容的详细解读,涵盖了Hadoop的分布式文件系统HDFS、MapReduce计算模型以及相关配置和操作细节,考生需要对...
**MapTask并行度**:MapTask的数量由InputFormat切片机制决定,直接影响作业的并行度和效率。适当的切片大小能最大化并行度,避免过多的磁盘I/O。 **ReduceTask运行流程:** 1. **数据拷贝**:Reduce进程启动数据...
近百节课视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,...064 源代码跟踪查看Map Task和Reduce Task数目的个数 065 回顾MapReduce执行过程以及MapReduce核心 066 Hadoop MapReduce框架数据类型讲解 067
9. MapReduce工作流程:包括分片、格式化数据源、MapTask执行、Shuffle过程、ReduceTask执行和写入文件等步骤。 10. Partitioner:Partitioner的作用是将key均匀分布到不同的ReduceTask上,以优化并行计算。 11. ...
- 源码解析:深入Hadoop源码,研究如NameNode、DataNode、MapTask和ReduceTask等关键类的功能实现。 - 故障恢复和容错机制:探讨Hadoop如何处理硬件故障,保持数据完整性。 - 性能调优:学习如何通过调整参数和...
MR 程序执行过程中,会生成多个 Task 任务,包括 MapTask 和 ReduceTask。Task 任务会被分配到不同的 NodeManager 节点上执行。 8. MapTask MapTask 是 MR 程序的映射阶段,负责将输入数据映射到键值对。 9. ...
源码中,MapTask和ReduceTask的执行流程值得深入分析。 四、源码学习价值 阅读Hadoop 2.10.0的源码,可以帮助我们: 1. 理解Hadoop的内部工作机制,提升问题排查能力。 2. 学习分布式系统的设计与实现,为自定义...