getMapOutput是ReduceTask.MapOutputCopier.copyOutput下面的方法。(MapOutputCopier是Thread)
其中copyOutput只做了数据基本校验,真正的逻辑在getMapOutput方法中。
getMapOutput里,注释部分已经说得很清楚,后面部分做了判断,是shuffle到mem还是disk。
再分别调用shuffleInMemory、shuffleToDisk,这里shuffleInMemory只是保存了数据,应该还没有sort。
用到URLConnection的inputStream来传数据,cp是走的HTTP吗?
/** * Get the map output into a local file (either in the inmemory fs or on the * local fs) from the remote server. * We use the file system so that we generate checksum files on the data. * @param mapOutputLoc map-output to be fetched * @param filename the filename to write the data into * @param connectionTimeout number of milliseconds for connection timeout * @param readTimeout number of milliseconds for read timeout * @return the path of the file that got created * @throws IOException when something goes wrong */ private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, Path filename, int reduce) throws IOException, InterruptedException { // Connect URL url = mapOutputLoc.getOutputLocation(); URLConnection connection = url.openConnection(); InputStream input = setupSecureConnection(mapOutputLoc, connection); // Validate header from map output TaskAttemptID mapId = null; try { mapId = TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK)); } catch (IllegalArgumentException ia) { LOG.warn("Invalid map id ", ia); return null; } TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId(); if (!mapId.equals(expectedMapId)) { LOG.warn("data from wrong map:" + mapId + " arrived to reduce task " + reduce + ", where as expected map output should be from " + expectedMapId); return null; } long decompressedLength = Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH)); long compressedLength = Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH)); if (compressedLength < 0 || decompressedLength < 0) { LOG.warn(getName() + " invalid lengths in map output header: id: " + mapId + " compressed len: " + compressedLength + ", decompressed len: " + decompressedLength); return null; } int forReduce = (int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK)); if (forReduce != reduce) { LOG.warn("data for the wrong reduce: " + forReduce + " with compressed len: " + compressedLength + ", decompressed len: " + decompressedLength + " arrived to reduce task " + reduce); return null; } if (LOG.isDebugEnabled()) { LOG.debug("header: " + mapId + ", compressed len: " + compressedLength + ", decompressed len: " + decompressedLength); } //We will put a file in memory if it meets certain criteria: //1. The size of the (decompressed) file should be less than 25% of // the total inmem fs //2. There is space available in the inmem fs // Check if this map-output can be saved in-memory boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); // Shuffle MapOutput mapOutput = null; if (shuffleInMemory) { if (LOG.isDebugEnabled()) { LOG.debug("Shuffling " + decompressedLength + " bytes (" + compressedLength + " raw bytes) " + "into RAM from " + mapOutputLoc.getTaskAttemptId()); } mapOutput = shuffleInMemory(mapOutputLoc, connection, input, (int)decompressedLength, (int)compressedLength); } else { if (LOG.isDebugEnabled()) { LOG.debug("Shuffling " + decompressedLength + " bytes (" + compressedLength + " raw bytes) " + "into Local-FS from " + mapOutputLoc.getTaskAttemptId()); } mapOutput = shuffleToDisk(mapOutputLoc, input, filename, compressedLength); } return mapOutput; }
MapOutput对象其实就是装了map输出数据的对象,因为肯定能装到内存(不然就直接扔disk了),所以一次拉数据就搞定了。
private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc, URLConnection connection, InputStream input, int mapOutputLength, int compressedLength) throws IOException, InterruptedException { // Reserve ram for the map-output boolean createdNow = ramManager.reserve(mapOutputLength, input); // Reconnect if we need to if (!createdNow) { // Reconnect try { connection = mapOutputLoc.getOutputLocation().openConnection(); input = setupSecureConnection(mapOutputLoc, connection); } catch (IOException ioe) { LOG.info("Failed reopen connection to fetch map-output from " + mapOutputLoc.getHost()); // Inform the ram-manager ramManager.closeInMemoryFile(mapOutputLength); ramManager.unreserve(mapOutputLength); throw ioe; } } IFileInputStream checksumIn = new IFileInputStream(input,compressedLength); input = checksumIn; // Are map-outputs compressed? if (codec != null) { decompressor.reset(); input = codec.createInputStream(input, decompressor); } // Copy map-output into an in-memory buffer byte[] shuffleData = new byte[mapOutputLength]; MapOutput mapOutput = new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), shuffleData, compressedLength); int bytesRead = 0; try { int n = IOUtils.wrappedReadForCompressedData(input, shuffleData, 0, shuffleData.length); while (n > 0) { bytesRead += n; shuffleClientMetrics.inputBytes(n); // indicate we're making progress reporter.progress(); n = IOUtils.wrappedReadForCompressedData(input, shuffleData, bytesRead, shuffleData.length - bytesRead); } if (LOG.isDebugEnabled()) { LOG.debug("Read " + bytesRead + " bytes from map-output for " + mapOutputLoc.getTaskAttemptId()); } input.close(); } catch (IOException ioe) { LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(), ioe); // Inform the ram-manager ramManager.closeInMemoryFile(mapOutputLength); ramManager.unreserve(mapOutputLength); // Discard the map-output try { mapOutput.discard(); } catch (IOException ignored) { LOG.info("Failed to discard map-output from " + mapOutputLoc.getTaskAttemptId(), ignored); } mapOutput = null; // Close the streams IOUtils.cleanup(LOG, input); // Re-throw readError = true; throw ioe; } // Close the in-memory file ramManager.closeInMemoryFile(mapOutputLength); // Sanity check if (bytesRead != mapOutputLength) { // Inform the ram-manager ramManager.unreserve(mapOutputLength); // Discard the map-output try { mapOutput.discard(); } catch (IOException ignored) { // IGNORED because we are cleaning up LOG.info("Failed to discard map-output from " + mapOutputLoc.getTaskAttemptId(), ignored); } mapOutput = null; throw new IOException("Incomplete map output received for " + mapOutputLoc.getTaskAttemptId() + " from " + mapOutputLoc.getOutputLocation() + " (" + bytesRead + " instead of " + mapOutputLength + ")" ); } // TODO: Remove this after a 'fix' for HADOOP-3647 if (LOG.isDebugEnabled()) { if (mapOutputLength > 0) { DataInputBuffer dib = new DataInputBuffer(); dib.reset(shuffleData, 0, shuffleData.length); LOG.debug("Rec #1 from " + mapOutputLoc.getTaskAttemptId() + " -> (" + WritableUtils.readVInt(dib) + ", " + WritableUtils.readVInt(dib) + ") from " + mapOutputLoc.getHost()); } } return mapOutput; }
private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc, InputStream input, Path filename, long mapOutputLength) throws IOException { // Find out a suitable location for the output on local-filesystem Path localFilename = lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), mapOutputLength, conf); MapOutput mapOutput = new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), conf, localFileSys.makeQualified(localFilename), mapOutputLength); // Copy data to local-disk OutputStream output = null; long bytesRead = 0; try { output = rfs.create(localFilename); byte[] buf = new byte[64 * 1024]; int n = -1; try { n = input.read(buf, 0, buf.length); } catch (IOException ioe) { readError = true; throw ioe; } while (n > 0) { bytesRead += n; shuffleClientMetrics.inputBytes(n); output.write(buf, 0, n); // indicate we're making progress reporter.progress(); try { n = input.read(buf, 0, buf.length); } catch (IOException ioe) { readError = true; throw ioe; } } LOG.info("Read " + bytesRead + " bytes from map-output for " + mapOutputLoc.getTaskAttemptId()); output.close(); input.close(); } catch (IOException ioe) { LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(), ioe); // Discard the map-output try { mapOutput.discard(); } catch (IOException ignored) { LOG.info("Failed to discard map-output from " + mapOutputLoc.getTaskAttemptId(), ignored); } mapOutput = null; // Close the streams IOUtils.cleanup(LOG, input, output); // Re-throw throw ioe; } // Sanity check if (bytesRead != mapOutputLength) { try { mapOutput.discard(); } catch (Exception ioe) { // IGNORED because we are cleaning up LOG.info("Failed to discard map-output from " + mapOutputLoc.getTaskAttemptId(), ioe); } catch (Throwable t) { String msg = getTaskID() + " : Failed in shuffle to disk :" + StringUtils.stringifyException(t); reportFatalError(getTaskID(), t, msg); } mapOutput = null; throw new IOException("Incomplete map output received for " + mapOutputLoc.getTaskAttemptId() + " from " + mapOutputLoc.getOutputLocation() + " (" + bytesRead + " instead of " + mapOutputLength + ")" ); } return mapOutput; }
相关推荐
那些年,与你同分同位次的同学都去了哪里?全国各大学在四川2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据
麒麟win10双系统重新安装win10后麒麟启动菜单看不到解决方法
多邻国Duolingo v6.0.3 高级版.apk
QT网络编程: 实现TCP通讯设置(客户端)
减少重复造轮子,开源微信小程序商城(前后端开源:uniapp+Java)。快速搭建一个属于自己的微信小程序商城。
彩虹云商城 最新彩虹代刷V6.9.0免授权纯净完整版 直接上传源码解压缩后访问域名安装即可,亲测可用 彩虹自助下单系统 安装说明: 上传到空间后直接访问即可根据提示安装。 PHP推荐使用7.0及以上版本 V6.9 1.修复SQL注入漏洞 2.修复后台微信QQ扫码登录 V6.8.5 1.修复亿乐对接 2.新增支持倍数输入框 V6.8 1.更新全新的faka模板 2.新增微信快捷登录 3.新增批量下单功能 4.防CC配置新增滑动验证码模式 5.修复部分地区后台加载错误 6.修复https网站对接http支付接口 7.后台登录支持微信QQ扫码登录
那些年,与你同分同位次的同学都去了哪里?全国各大学在四川2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据
那些年,与你同分同位次的同学都去了哪里?全国各大学在四川2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据
网鼎杯
网络安全入门教程(工具版)
科普里控制器调试软件工具使用 win64环境安装
内容概要:本文档详细介绍了GC9503V单片机a-Si TFT LCD驱动器的技术规格,包括主要特点、内部结构图、引脚定义以及系统接口等。GC9503V支持480x864分辨率,16.7百万色显示,无内置GRAM。文章还提供了详细的引脚尺寸、对齐标记尺寸、芯片信息以及接口模式控制的序列实例,如DCS写入命令及其参数。 适合人群:LCD显示屏设计人员、嵌入式系统工程师、电子硬件开发者和技术研究人员。 使用场景及目标:帮助开发者快速理解和应用GC9503V在实际产品中的具体使用方法,掌握LCM与MCU之间的数据交互方式,实现高效的屏幕驱动设计。 其他说明:GalaxyCore公司保留在不事先通知的情况下更改文档内容的权利。
那些年,与你同分同位次的同学都去了哪里?全国各大学在四川2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据
yolo系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值
那些年,与你同分同位次的同学都去了哪里?全国各大学在四川2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据
某酒店排水课程设计计算书.doc
那些年,与你同分同位次的同学都去了哪里?全国各大学在四川2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据
那些年,与你同分同位次的同学都去了哪里?全国各大学在四川2020-2024年各专业最低录取分数及录取位次数据,高考志愿必备参考数据