public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException { this.umbilical = umbilical; job.setBoolean("mapred.skip.on", isSkipping()); if (isMapOrReduce()) { copyPhase = getProgress().addPhase("copy"); sortPhase = getProgress().addPhase("sort"); reducePhase = getProgress().addPhase("reduce"); } // start thread that will handle communication with parent TaskReporter reporter = new TaskReporter(getProgress(), umbilical, jvmContext); reporter.startCommunicationThread(); boolean useNewApi = job.getUseNewReducer(); initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; } // Initialize the codec codec = initCodec(); boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local")); if (!isLocal) { reduceCopier = new ReduceCopier(umbilical, job, reporter); if (!reduceCopier.fetchOutputs()) { if(reduceCopier.mergeThrowable instanceof FSError) { throw (FSError)reduceCopier.mergeThrowable; } throw new IOException("Task: " + getTaskID() + " - The reduce copier failed", reduceCopier.mergeThrowable); } } copyPhase.complete(); // copy is already complete setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); final FileSystem rfs = FileSystem.getLocal(job).getRaw(); RawKeyValueIterator rIter = isLocal ? Merger.merge(job, rfs, job.getMapOutputKeyClass(), job.getMapOutputValueClass(), codec, getMapFiles(rfs, true), !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100), new Path(getTaskID().toString()), job.getOutputKeyComparator(), reporter, spilledRecordsCounter, null) : reduceCopier.createKVIterator(job, rfs, reporter); // free up the data structures mapOutputFilesOnDisk.clear(); sortPhase.complete(); // sort is complete setPhase(TaskStatus.Phase.REDUCE); statusUpdate(umbilical); Class keyClass = job.getMapOutputKeyClass(); Class valueClass = job.getMapOutputValueClass(); RawComparator comparator = job.getOutputValueGroupingComparator(); if (useNewApi) { runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } else { runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } done(umbilical, reporter); }
FetchOutput
public boolean fetchOutputs() throws IOException { int totalFailures = 0; int numInFlight = 0, numCopied = 0; DecimalFormat mbpsFormat = new DecimalFormat("0.00"); final Progress copyPhase = reduceTask.getProgress().phase(); LocalFSMerger localFSMergerThread = null; InMemFSMergeThread inMemFSMergeThread = null; GetMapEventsThread getMapEventsThread = null; for (int i = 0; i < numMaps; i++) { copyPhase.addPhase(); // add sub-phase per file } copiers = new ArrayList<MapOutputCopier>(numCopiers); // start all the copying threads for (int i=0; i < numCopiers; i++) { MapOutputCopier copier = new MapOutputCopier(conf, reporter, reduceTask.getJobTokenSecret()); copiers.add(copier); copier.start(); } //start the on-disk-merge thread localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys); //start the in memory merger thread inMemFSMergeThread = new InMemFSMergeThread(); localFSMergerThread.start(); inMemFSMergeThread.start(); // start the map events thread getMapEventsThread = new GetMapEventsThread(); getMapEventsThread.start(); // start the clock for bandwidth measurement long startTime = System.currentTimeMillis(); long currentTime = startTime; long lastProgressTime = startTime; long lastOutputTime = 0; // loop until we get all required outputs while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) { int numEventsAtStartOfScheduling; synchronized (copyResultsOrNewEventsLock) { numEventsAtStartOfScheduling = numEventsFetched; } currentTime = System.currentTimeMillis(); boolean logNow = false; if (currentTime - lastOutputTime > MIN_LOG_TIME) { lastOutputTime = currentTime; logNow = true; } if (logNow) { LOG.info(reduceTask.getTaskID() + " Need another " + (numMaps - copiedMapOutputs.size()) + " map output(s) " + "where " + numInFlight + " is already in progress"); } // Put the hash entries for the failed fetches. Iterator<MapOutputLocation> locItr = retryFetches.iterator(); while (locItr.hasNext()) { MapOutputLocation loc = locItr.next(); List<MapOutputLocation> locList = mapLocations.get(loc.getHost()); // Check if the list exists. Map output location mapping is cleared // once the jobtracker restarts and is rebuilt from scratch. // Note that map-output-location mapping will be recreated and hence // we continue with the hope that we might find some locations // from the rebuild map. if (locList != null) { // Add to the beginning of the list so that this map is //tried again before the others and we can hasten the //re-execution of this map should there be a problem locList.add(0, loc); } } if (retryFetches.size() > 0) { LOG.info(reduceTask.getTaskID() + ": " + "Got " + retryFetches.size() + " map-outputs from previous failures"); } // clear the "failed" fetches hashmap retryFetches.clear(); // now walk through the cache and schedule what we can int numScheduled = 0; int numDups = 0; synchronized (scheduledCopies) { // Randomize the map output locations to prevent // all reduce-tasks swamping the same tasktracker List<String> hostList = new ArrayList<String>(); hostList.addAll(mapLocations.keySet()); Collections.shuffle(hostList, this.random); Iterator<String> hostsItr = hostList.iterator(); while (hostsItr.hasNext()) { String host = hostsItr.next(); List<MapOutputLocation> knownOutputsByLoc = mapLocations.get(host); // Check if the list exists. Map output location mapping is // cleared once the jobtracker restarts and is rebuilt from // scratch. // Note that map-output-location mapping will be recreated and // hence we continue with the hope that we might find some // locations from the rebuild map and add then for fetching. if (knownOutputsByLoc == null || knownOutputsByLoc.size() == 0) { continue; } //Identify duplicate hosts here if (uniqueHosts.contains(host)) { numDups += knownOutputsByLoc.size(); continue; } Long penaltyEnd = penaltyBox.get(host); boolean penalized = false; if (penaltyEnd != null) { if (currentTime < penaltyEnd.longValue()) { penalized = true; } else { penaltyBox.remove(host); } } if (penalized) continue; synchronized (knownOutputsByLoc) { locItr = knownOutputsByLoc.iterator(); while (locItr.hasNext()) { MapOutputLocation loc = locItr.next(); // Do not schedule fetches from OBSOLETE maps if (obsoleteMapIds.contains(loc.getTaskAttemptId())) { locItr.remove(); continue; } uniqueHosts.add(host); scheduledCopies.add(loc); locItr.remove(); // remove from knownOutputs numInFlight++; numScheduled++; break; //we have a map from this host } } } scheduledCopies.notifyAll(); } if (numScheduled > 0 || logNow) { LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled + " outputs (" + penaltyBox.size() + " slow hosts and" + numDups + " dup hosts)"); } if (penaltyBox.size() > 0 && logNow) { LOG.info("Penalized(slow) Hosts: "); for (String host : penaltyBox.keySet()) { LOG.info(host + " Will be considered after: " + ((penaltyBox.get(host) - currentTime)/1000) + " seconds."); } } // if we have no copies in flight and we can't schedule anything // new, just wait for a bit try { if (numInFlight == 0 && numScheduled == 0) { // we should indicate progress as we don't want TT to think // we're stuck and kill us reporter.progress(); Thread.sleep(5000); } } catch (InterruptedException e) { } // IGNORE while (numInFlight > 0 && mergeThrowable == null) { LOG.debug(reduceTask.getTaskID() + " numInFlight = " + numInFlight); //the call to getCopyResult will either //1) return immediately with a null or a valid CopyResult object, // or //2) if the numInFlight is above maxInFlight, return with a // CopyResult object after getting a notification from a // fetcher thread, //So, when getCopyResult returns null, we can be sure that //we aren't busy enough and we should go and get more mapcompletion //events from the tasktracker CopyResult cr = getCopyResult(numInFlight, numEventsAtStartOfScheduling); if (cr == null) { break; } if (cr.getSuccess()) { // a successful copy numCopied++; lastProgressTime = System.currentTimeMillis(); reduceShuffleBytes.increment(cr.getSize()); long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1; float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024); float transferRate = mbs/secsSinceStart; copyPhase.startNextPhase(); copyPhase.setStatus("copy (" + numCopied + " of " + numMaps + " at " + mbpsFormat.format(transferRate) + " MB/s)"); // Note successful fetch for this mapId to invalidate // (possibly) old fetch-failures fetchFailedMaps.remove(cr.getLocation().getTaskId()); } else if (cr.isObsolete()) { //ignore LOG.info(reduceTask.getTaskID() + " Ignoring obsolete copy result for Map Task: " + cr.getLocation().getTaskAttemptId() + " from host: " + cr.getHost()); } else { retryFetches.add(cr.getLocation()); // note the failed-fetch TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId(); TaskID mapId = cr.getLocation().getTaskId(); totalFailures++; Integer noFailedFetches = mapTaskToFailedFetchesMap.get(mapTaskId); noFailedFetches = (noFailedFetches == null) ? 1 : (noFailedFetches + 1); mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches); LOG.info("Task " + getTaskID() + ": Failed fetch #" + noFailedFetches + " from " + mapTaskId); if (noFailedFetches >= abortFailureLimit) { LOG.fatal(noFailedFetches + " failures downloading " + getTaskID() + "."); umbilical.shuffleError(getTaskID(), "Exceeded the abort failure limit;" + " bailing-out.", jvmContext); } checkAndInformJobTracker(noFailedFetches, mapTaskId, cr.getError().equals(CopyOutputErrorType.READ_ERROR)); // note unique failed-fetch maps if (noFailedFetches == maxFetchFailuresBeforeReporting) { fetchFailedMaps.add(mapId); // did we have too many unique failed-fetch maps? // and did we fail on too many fetch attempts? // and did we progress enough // or did we wait for too long without any progress? // check if the reducer is healthy boolean reducerHealthy = (((float)totalFailures / (totalFailures + numCopied)) < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT); // check if the reducer has progressed enough boolean reducerProgressedEnough = (((float)numCopied / numMaps) >= MIN_REQUIRED_PROGRESS_PERCENT); // check if the reducer is stalled for a long time // duration for which the reducer is stalled int stallDuration = (int)(System.currentTimeMillis() - lastProgressTime); // duration for which the reducer ran with progress int shuffleProgressDuration = (int)(lastProgressTime - startTime); // min time the reducer should run without getting killed int minShuffleRunDuration = (shuffleProgressDuration > maxMapRuntime) ? shuffleProgressDuration : maxMapRuntime; boolean reducerStalled = (((float)stallDuration / minShuffleRunDuration) >= MAX_ALLOWED_STALL_TIME_PERCENT); // kill if not healthy and has insufficient progress if ((fetchFailedMaps.size() >= maxFailedUniqueFetches || fetchFailedMaps.size() == (numMaps - copiedMapOutputs.size())) && !reducerHealthy && (!reducerProgressedEnough || reducerStalled)) { LOG.fatal("Shuffle failed with too many fetch failures " + "and insufficient progress!" + "Killing task " + getTaskID() + "."); umbilical.shuffleError(getTaskID(), "Exceeded MAX_FAILED_UNIQUE_FETCHES;" + " bailing-out.", jvmContext); } } currentTime = System.currentTimeMillis(); long currentBackOff = (long)(INITIAL_PENALTY * Math.pow(PENALTY_GROWTH_RATE, noFailedFetches)); penaltyBox.put(cr.getHost(), currentTime + currentBackOff); LOG.warn(reduceTask.getTaskID() + " adding host " + cr.getHost() + " to penalty box, next contact in " + (currentBackOff/1000) + " seconds"); } uniqueHosts.remove(cr.getHost()); numInFlight--; } } // all done, inform the copiers to exit exitGetMapEvents= true; try { getMapEventsThread.join(); LOG.info("getMapsEventsThread joined."); } catch (InterruptedException ie) { LOG.info("getMapsEventsThread threw an exception: " + StringUtils.stringifyException(ie)); } synchronized (copiers) { synchronized (scheduledCopies) { for (MapOutputCopier copier : copiers) { copier.interrupt(); } copiers.clear(); } } // copiers are done, exit and notify the waiting merge threads synchronized (mapOutputFilesOnDisk) { exitLocalFSMerge = true; mapOutputFilesOnDisk.notify(); } ramManager.close(); //Do a merge of in-memory files (if there are any) if (mergeThrowable == null) { try { // Wait for the on-disk merge to complete localFSMergerThread.join(); LOG.info("Interleaved on-disk merge complete: " + mapOutputFilesOnDisk.size() + " files left."); //wait for an ongoing merge (if it is in flight) to complete inMemFSMergeThread.join(); LOG.info("In-memory merge complete: " + mapOutputsFilesInMemory.size() + " files left."); } catch (InterruptedException ie) { LOG.warn(reduceTask.getTaskID() + " Final merge of the inmemory files threw an exception: " + StringUtils.stringifyException(ie)); // check if the last merge generated an error if (mergeThrowable != null) { mergeThrowable = ie; } return false; } } return mergeThrowable == null && copiedMapOutputs.size() == numMaps; }
reducer读数据和map一样是用iterator的,不过不同的是,ruducer边排序边读数据。
private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewReducer(JobConf job, final TaskUmbilicalProtocol umbilical, final TaskReporter reporter, RawKeyValueIterator rIter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass ) throws IOException,InterruptedException, ClassNotFoundException { // wrap value iterator to report progress. final RawKeyValueIterator rawIter = rIter; rIter = new RawKeyValueIterator() { public void close() throws IOException { rawIter.close(); } public DataInputBuffer getKey() throws IOException { return rawIter.getKey(); } public Progress getProgress() { return rawIter.getProgress(); } public DataInputBuffer getValue() throws IOException { return rawIter.getValue(); } public boolean next() throws IOException { boolean ret = rawIter.next(); reducePhase.set(rawIter.getProgress().get()); reporter.progress(); return ret; } }; // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID()); // make a reducer org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getReducerClass(), job); org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output = (org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>) outputFormat.getRecordWriter(taskContext); org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(output, reduceOutputCounter); job.setBoolean("mapred.skip.on", isSkipping()); org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(), rIter, reduceInputKeyCounter, reduceInputValueCounter, trackedRW, committer, reporter, comparator, keyClass, valueClass); reducer.run(reducerContext); output.close(reducerContext); }
相关推荐
在Hadoop大数据平台中,数据本地性(Data Locality)是一个关键的概念,它涉及到如何高效地执行ReduceTask,以最大化集群资源的利用并减少网络IO的开销。本主题主要探讨了基于网络IO代价评估的ReduceTask数据本地性...
例如,`org.apache.hadoop.mapred.MapTask`和`org.apache.hadoop.mapreduce.ReduceTask`分别对应Map和Reduce任务的实现,开发者可以通过阅读这些源码了解任务执行的详细流程。 7. **工具集成**:有许多开源工具可以...
- 输出收集:MapTask完成后,中间结果通过Shuffle和Sort阶段传递给ReduceTask,ReduceTask调用Reducer生成最终输出。 整个MapReduce过程是高度协调和分布式的,JobTracker、TaskTracker和Task间的通信以及数据传输...
20. **ReduceTask的工作流程**:ReduceTask包含Copy、Merge、Sort、Reduce和Write五个阶段。 21. **Mapper类**:Hadoop提供的Mapper类是实现Map阶段逻辑的基础类。 以上是对Hadoop大数据期末考试重点内容的详细...
TaskTracker 是任务跟踪器,负责运行 Map Task 和 Reduce Task,与 JobTracker 交互,执行命令,并汇报任务状态。 6. Map 和 Reduce 任务: Map 任务负责解析每条数据记录,传递给用户编写的 map(),将 map() 输出...
9. MapReduce工作流程:包括分片、格式化数据源、MapTask执行、Shuffle过程、ReduceTask执行和写入文件等步骤。 10. Partitioner:Partitioner的作用是将key均匀分布到不同的ReduceTask上,以优化并行计算。 11. ...
在Hadoop框架中,`Task`类是处理数据的核心组件之一,它包括`MapTask`和`ReduceTask`两种类型,分别负责数据的映射处理和归约处理。本文将深入剖析`Task`类中的内部类及其辅助类,旨在理解这些类如何协同工作以支持...
在 Hadoop MapReduce 中,可以通过 Job 类的 `setNumReduceTasks(int num)` 方法来设置 Reduce Task 的数量,以适应不同的分区需求。调整 Reduce Task 的数量可以优化系统资源的利用,提高并行处理能力,同时也可以...
4. **数据收集**:map函数的输出通过context.write收集,使用默认的HashPartitioner进行分区,确保键值对被发送到正确的ReduceTask。 5. **内存缓冲与溢写**:数据写入环形内存缓冲区,当达到溢写条件(如默认的80%...
MR 程序执行过程中,会生成多个 Task 任务,包括 MapTask 和 ReduceTask。Task 任务会被分配到不同的 NodeManager 节点上执行。 8. MapTask MapTask 是 MR 程序的映射阶段,负责将输入数据映射到键值对。 9. ...
近百节课视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,...064 源代码跟踪查看Map Task和Reduce Task数目的个数 065 回顾MapReduce执行过程以及MapReduce核心 066 Hadoop MapReduce框架数据类型讲解 067
- 源码解析:深入Hadoop源码,研究如NameNode、DataNode、MapTask和ReduceTask等关键类的功能实现。 - 故障恢复和容错机制:探讨Hadoop如何处理硬件故障,保持数据完整性。 - 性能调优:学习如何通过调整参数和...
* Reduce-task:负责将中间结果进行合并和处理,生成最终结果。 MapReduce 的主要特点包括: * 高可扩展:MapReduce 可以水平扩展,添加更多的计算节点以满足不断增长的数据需求。 * 高性能:MapReduce 可以并行...
源码中,MapTask和ReduceTask的执行流程值得深入分析。 四、源码学习价值 阅读Hadoop 2.10.0的源码,可以帮助我们: 1. 理解Hadoop的内部工作机制,提升问题排查能力。 2. 学习分布式系统的设计与实现,为自定义...
处理逻辑隐藏在代码细节中,没有整体逻辑中间结果也放在 HDFS 文件系统中 ReduceTask 需要等待所有 MapTask 都完成后才可以开始时延高,只适用 Batch 数据处理,对于交互式数据处理,实时数据处理的支持不够。...
数据经过网络传输后,ReduceTask会根据分区规则将数据排序,然后Reducer处理这些数据生成最终结果。 此外,Hadoop的容错机制也是源码分析的重点。例如,如果某个TaskTracker宕机,JobTracker会重新调度其上的任务到...
Task分为Map Task和Reduce Task,分别处理输入数据的映射阶段和数据聚合阶段。通过源码,我们可以深入了解任务分配策略、数据本地化优化、容错机制等关键功能。 Hadoop源码分析还包括对YARN(Yet Another Resource ...
TaskTracker 负责执行每一个任务,包括 MapTask 和 ReduceTask。JobClient 是一个客户端 API,用于将应用程序和配置参数打包成 jar 文件,并将其提交到 JobTracker 服务中。 JobInProgress 是 JobTracker 创建的一...