本篇博客将以WordCount为例从源码上分析hadoop作业提交流程,所使用的hadoop版本为cdh4.3.0。
hadoop作业提交流程如下图所示:
public static void distribute() throws Exception { /*指定Hadoop 环境的用户名称, 如果不指定会抛org.apache.hadoop.security.AccessControlException,访问受限*/ System.setProperty("HADOOP_USER_NAME", "DQ"); Configuration conf = new Configuration(); //指定执行过程中临时文件存放目录 conf.setStrings("hadoop.tmp.dir", "/home/DQ/hsh_test/"); //指定jobTracker conf.set("mapred.job.tracker", "hadoop01:8021"); //指定作业依赖的jar在本地的路径 conf.set("tmpjars", "/home/david/workspace/haddop-test/thirdlib.jar"); //指定mr jar包在本地的路径 conf.set("mapred.jar", "/home/david/workspace/haddop-test/mr.jar"); conf.setStrings("tmpfiles", "/home/david/workspace/haddop-test/refData/dict.txt"); //指定依赖的归档文件 conf.setStrings("tmparchives", "/home/david/workspace/haddop-test/refData/archives/archives.zip"); //指定分片大小 // long splip_size = 70 * 1024 * 1024; // conf.setLong("mapred.max.split.size", splip_size); Job wordCountjob = Job.getInstance(conf, "wordcount"); wordCountjob.setNumReduceTasks(3); //配置reduec任务的数量 wordCountjob.setInputFormatClass(TextInputFormat.class); wordCountjob.setMapperClass(TokenizerMapper.class); //指定对中间数据进行合并的类 wordCountjob.setCombinerClass(IntSumReducer.class); wordCountjob.setReducerClass(IntSumReducer.class); wordCountjob.setOutputKeyClass(Text.class); wordCountjob.setOutputValueClass(IntWritable.class); // 指定输入数据在hdfs上的存放路径 Path input1 = new Path("hdfs://hadoop01:8020/user/DQ/input1.txt"); Path input2 = new Path("hdfs://hadoop01:8020/user/DQ/input2.txt"); FileInputFormat.addInputPath(wordCountjob, input1); FileInputFormat.addInputPath(wordCountjob, input2); //作业执行结果在hdfs上的存放路径 Path output = new Path("hdfs://hadoop01:8020/user/DQ/output/"+System.currentTimeMillis()); FileOutputFormat.setOutputPath(wordCountjob, output); boolean flag = wordCountjob.waitForCompletion(true); if(flag) System.exit(1); System.exit(0); }
客户端代码中,distribute( )方法创建Configuration 实例conf,并将 jobTracker信息、作业的mr jar包、作业依赖的jar以及依赖的其他文件添加到conf中,然后以conf为参数创建名为wordcount的作业实例wordCountJob,并指定其执行的Mapper和Reducer以及输入输出等信息。wordCountJob调用waitForCompletion(boolean verbose)提交作业,源码如下:
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } if (verbose) { // 如果verbose为true 就打印作业执行信息,否则不打印 jobClient.monitorAndPrintJob(conf, info); } else { info.waitForCompletion(); } return isSuccessful(); }
waitForCompletion( )方法内部判定作业处于新建状态就调用Job的submit( )来提交。submit( )源码如下:
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); // Connect to the JobTracker and submit the job connect(); info = jobClient.submitJobInternal(conf); super.setJobID(info.getID()); state = JobState.RUNNING; }
submit( )会进一步检查作业的状态是否是DEFINE,如果不是就终止提交。检查通过后调用connect( )与JobTracker建立连接,此过程会创建JobClient实例,重点在于实例的初始化方法init(JobConf conf)。
public void init(JobConf conf) throws IOException { setConf(conf); String tracker = conf.get("mapred.job.tracker", "local"); tasklogtimeout = conf.getInt( TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT); this.ugi = UserGroupInformation.getCurrentUser(); if ("local".equals(tracker)) { conf.setNumMapTasks(1); this.jobSubmitClient = new LocalJobRunner(conf); } else if (!HAUtil.isHAEnabled(conf, tracker)) { this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf); } else { this.jobSubmitClient = createRPCProxy(tracker, conf); }
JobClient的init方法中会根据配置的jobTracker来创建JobSubmissionProtocol实例jobSubmitClient,jobClient就是通过jobSubmitClient来向jobTracker提交作业的。如果用户没有配置mapred.job.tracker,其默认就为“local”,这种情况下会创建LocalJobRunner作为jobTracker,作业将会在本地而非分布式环境中执行。有关作业的本地执行,后续博客会详细介绍,本篇只介绍分布式执行。如果配置mapred.job.tracker为分布式环境中的jobTracker的地址,就创建jobTracker的rpc代理,由该代理来完成与jobTracker的交互。
至此客户端提交代码所需要的实例都已创建完成(重点是jobClient实例和他的成员jobSubmitClient),下个阶段就是数据准备阶段。Job通过对jobClient调用submitJobInternal(conf)方法来真正完成作业的提交。
public RunningJob submitJobInternal(final JobConf job) throws FileNotFoundException, ClassNotFoundException, InterruptedException, IOException { return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() { public RunningJob run() throws FileNotFoundException, ClassNotFoundException,InterruptedException,IOException{ JobConf jobCopy = job; /*jobStagingArea是所有作业提交到JobTracker的文件系统中的根目录*/ Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,jobCopy); JobID jobId = jobSubmitClient.getNewJobId(); /*在JobTracker的文件系统上为当前提交的作业生成根目录(jobStagingArea/jobId/),该目录下存放的文件包括: *依赖的普通文件 存放目录:jobStagingArea/jobId/files *依赖的jar包 存放目录:jobStagingArea/jobId/libjars *依赖的归档文件 存放目录:jobStagingArea/jobId/archives *MR jar包 存放目录:jobStagingArea/jobId/ */ Path submitJobDir = new Path(jobStagingArea, jobId.toString()); jobCopy.set("mapreduce.job.dir", submitJobDir.toString()); JobStatus status = null; try { populateTokenCache(jobCopy, jobCopy.getCredentials()); copyAndConfigureFiles(jobCopy, submitJobDir); //文件生成、上传 // get delegation token for the dir TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),new Path [] {submitJobDir},jobCopy); /* 作业配置文件job.xml文件在JobTracker的文件系统上的存放路径 */ Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); /* 获取配置的reduceTask的数量,默认为1 */ int reduces = jobCopy.getNumReduceTasks(); InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { job.setJobSubmitHostAddress(ip.getHostAddress()); job.setJobSubmitHostName(ip.getHostName()); } JobContext context = new JobContextImpl(jobCopy, jobId); jobCopy = (JobConf)context.getConfiguration(); // Check the output specification /* 通过OutputFormat检查作业输出目录是否有效 */ if (reduces == 0 ? jobCopy.getUseNewMapper() : jobCopy.getUseNewReducer()) { org.apache.hadoop.mapreduce.OutputFormat<?,?> output = ReflectionUtils.newInstance(context.getOutputFormatClass(),jobCopy); output.checkOutputSpecs(context); } else { jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy); } // Create the splits for the job FileSystem fs = submitJobDir.getFileSystem(jobCopy); LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir)); /* 对输入进行分片,并将分片信息和分片源信息上传到JobTracker的文件系统中的submitJobDir目录下, * 最后返回分片数作为MapTask的数量 */ int maps = writeSplits(context, submitJobDir); jobCopy.setNumMapTasks(maps); // write "queue admins of the queue to which job is being submitted" to job file. String queue = jobCopy.getQueueName(); AccessControlList acl = jobSubmitClient.getQueueAdmins(queue); jobCopy.set(QueueManager.toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); /* 至此所有需要写到job.xml文件中的配置信息都已经写了, * 此处将最终配置文件上传到JobTracker的文件系统中 */ FSDataOutputStream out = FileSystem.create(fs, submitJobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(jobCopy); try { jobCopy.writeXml(out); } finally { out.close(); } /* * 至此准备阶段完成,接下来由jobTracker代理通过RPC正式提交作业到集群 */ printTokens(jobId, jobCopy.getCredentials()); status = jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials()); if (status != null) { return new NetworkedJob(status); } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (fs != null && submitJobDir != null) fs.delete(submitJobDir, true); } } } }); }
本过程中,JobTracker的代理jobSubmitClient首先向jobTracker申请作业信息提交到jobTracker文件系统上的根目录和jobId,然后将作业依赖的各类文件(jar,普通文件、分档文件、job.xml等)提交到各自在jobTracker文件系统中的目录里面。依赖文件的处理在copyAndConfigureFiles (JobConf job, Path submitJobDir, short replication)中进行,源码如下:
private void copyAndConfigureFiles(JobConf job, Path submitJobDir, short replication) throws IOException, InterruptedException { if (!(job.getBoolean("mapred.used.genericoptionsparser", false))) { LOG.warn("Use GenericOptionsParser for parsing the arguments. " + "Applications should implement Tool for the same."); } String files = job.get("tmpfiles"); String libjars = job.get("tmpjars"); String archives = job.get("tmparchives"); // // Figure out what fs the JobTracker is using. Copy the // job to it, under a temporary name. This allows DFS to work, // and under the local fs also provides UNIX-like object loading // semantics. (that is, if the job file is deleted right after // submission, we can still run the submission to completion) // // Create a number of filenames in the JobTracker's fs namespace FileSystem fs = submitJobDir.getFileSystem(job); LOG.debug("default FileSystem: " + fs.getUri()); if (fs.exists(submitJobDir)) { throw new IOException("Not submitting job. Job directory " + submitJobDir +" already exists!! This is unexpected.Please check what's there in" + " that directory"); } submitJobDir = fs.makeQualified(submitJobDir); FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); FileSystem.mkdirs(fs, submitJobDir, mapredSysPerms); Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir); Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir); Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir); // add all the command line files/ jars and archive // first copy them to jobtrackers filesystem //在jobTracker的文件系统中为tmpfiles创建目录,从本地将文件上传到该目录,并将其放到分布式缓存中。 if (files != null) { FileSystem.mkdirs(fs, filesDir, mapredSysPerms); String[] fileArr = files.split(","); for (String tmpFile: fileArr) { URI tmpURI; try { tmpURI = new URI(tmpFile); } catch (URISyntaxException e) { throw new IllegalArgumentException(e); } Path tmp = new Path(tmpURI); Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication); try { URI pathURI = getPathURI(newPath, tmpURI.getFragment()); DistributedCache.addCacheFile(pathURI, job); } catch(URISyntaxException ue) { //should not throw a uri exception throw new IOException("Failed to create uri for " + tmpFile, ue); } DistributedCache.createSymlink(job); } } //在jobTracker的文件系统中为作业依赖的jar包创建目录,从本地将jar上传到该目录,并将其放到分布式缓存中。 if (libjars != null) { FileSystem.mkdirs(fs, libjarsDir, mapredSysPerms); String[] libjarsArr = libjars.split(","); for (String tmpjars: libjarsArr) { Path tmp = new Path(tmpjars); Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication); DistributedCache.addFileToClassPath( new Path(newPath.toUri().getPath()), job, fs); } } //在jobTracker的文件系统中为作业依赖的归档文件创建目录,从本地将归档文件上传到该目录,并将其放到分布式缓存中。 if (archives != null) { FileSystem.mkdirs(fs, archivesDir, mapredSysPerms); String[] archivesArr = archives.split(","); for (String tmpArchives: archivesArr) { URI tmpURI; try { tmpURI = new URI(tmpArchives); } catch (URISyntaxException e) { throw new IllegalArgumentException(e); } Path tmp = new Path(tmpURI); Path newPath = copyRemoteFiles(fs, archivesDir, tmp, job, replication); try { URI pathURI = getPathURI(newPath, tmpURI.getFragment()); DistributedCache.addCacheArchive(pathURI, job); } catch(URISyntaxException ue) { //should not throw an uri excpetion throw new IOException("Failed to create uri for " + tmpArchives, ue); } DistributedCache.createSymlink(job); } } TrackerDistributedCacheManager.validate(job); TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(job); TrackerDistributedCacheManager.getDelegationTokens(job, job.getCredentials()); //将作业的mr jar包上传到jobTracker的文件系统中 String originalJarPath = job.getJar(); if (originalJarPath != null) { if ("".equals(job.getJobName())){ job.setJobName(new Path(originalJarPath).getName()); } Path originalJarFile = new Path(originalJarPath); URI jobJarURI = originalJarFile.toUri(); // If the job jar is already in fs, we don't need to copy it from local fs if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null || !(jobJarURI.getScheme().equals(fs.getUri().getScheme()) && jobJarURI.getAuthority().equals( fs.getUri().getAuthority()))) { Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir); job.setJar(submitJarFile.toString()); fs.copyFromLocalFile(originalJarFile, submitJarFile); fs.setReplication(submitJarFile, replication); fs.setPermission(submitJarFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); } } else { LOG.warn("No job jar file set. User classes may not be found. "+ "See JobConf(Class) or JobConf#setJar(String)."); } }
该方法执行完之后,jobTracker文件系统中将添加如下文件:
submitJobInternal( )方法在处理完以上文件之后会创建OutputFormat实例来验证输出路径是否合法,验证通过之后就开始处理输入数据分片。
首先jobClient调用其方法writeNewSplits(JobContext job, Path jobSubmitDir) ,该方法通过反射创建InputFormat实例(这里使用的是FileInputFormat),然后用该实例创建分片,对分片排序之后会将分片信息写入文件上传到jobTracker的文件系统中,源码如下:
private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); //通过反射创建InputFormat实例 InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); //InputFormat实例创建分片 List<InputSplit> splits = input.getSplits(job); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); //对分片按照大小降序排序,这样可以保证数据量大的分片现行被处理 Arrays.sort(array, new SplitComparator()); //将分片信息写入文件中 JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); //返回split数量作为mapTask的数量 return array.length; }
InputFormat实例创建好分片信息后,由JobSplitWriter负责写分片信息到文件中,源码如下:
public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, Configuration conf, FileSystem fs, T[] splits) throws IOException, InterruptedException { //写分片源数据信息到文件job.spilt中并创建分片元数据信息 FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf); SplitMetaInfo[] info = writeNewSplits(conf, splits, out); out.close(); //写分片元数据信息到文件job.splitmetainfo中 writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info); }
分片信息文件(job.split 和 job.splitmetainfo)会被上传到JobTracker的文件系统中,目录结构如下:
job.split文件记录了每个分片的源数据信息:分片的数据结构(本例中就是FileSplit)和输入文件在hdfs上的逻辑位置。job.splitmetainfo文件记录了每个分片的元数据。两个文件的内容如下:
最后作业需要的所有配置信息都已经配置到JobConf(jobCopy)中了,该配置对象包含的所有信息会被写到jobTracker文件系统中的job.xml文件中。
到此所有数据准备完毕,jobSubmitClient调用submitJob( )方法将作业提交给jobTracker,作业提交完成,接下来作业的调度就交由jobTracker完成。
相关推荐
【Hadoop 作业提交流程详解】 在Hadoop生态系统中,提交一个MapReduce作业通常通过执行类似`bin/hadoop jar xxx.jar mainclass args`的命令来完成。这个过程看似简单,实际上涉及到了多个步骤和组件的交互。下面...
在IT行业中,尤其是在大数据处理领域,Hadoop...通过这些方法,你可以深入理解Hadoop作业的执行流程,找出并修复问题,提升作业的性能和稳定性。在实际操作中,不断实践和学习,你的Hadoop开发和调试技能将会日益精进。
3. **MapReduce作业提交**:`hadoop jar`命令用于提交MapReduce作业,该命令指定包含主类的JAR文件和作业配置参数。 4. **数据分片与映射**:在Map阶段,Hadoop将输入数据分割成多个块,并在不同的节点上并行执行...
`Job`类是提交任务的入口点,包含了任务配置和提交流程。`InputFormat`和`OutputFormat`定义了数据输入和输出的格式。`MapTask`和`ReduceTask`类则负责执行Map和Reduce任务,它们如何调度和执行是理解MapReduce工作...
Hadoop运行流程详解 Hadoop是一个开源分布式计算框架,核心由两个主要组件构成:HDFS(Hadoop Distributed File System)和MapReduce。...理解这一执行流程对于优化Hadoop作业性能和解决可能出现的问题至关重要。
3. Hadoop作业调度流程包括作业提交、任务分配、任务执行、任务监控等步骤。 4. 推测执行任务是为了解决计算集群中的一些问题,例如资源分布不均、程序的错误或者是硬件的故障等。 5. 集群中的慢节点是指某些节点的...
6. **JobSubmitter**:`LinuxS.java`可能是一个JobSubmitter类,负责将Mapper和Reducer组合成一个完整的Hadoop作业,并提交到集群进行执行。JobSubmitter会设置作业参数,如输入输出路径、分区函数、排序规则等,...
例如,可以创建RESTful API,让Web应用向Hadoop提交MapReduce作业,获取处理结果。 流程图中可能涵盖了以下步骤: 1. 数据采集:通过Web服务器日志、用户行为跟踪等方式收集数据。 2. 数据预处理:清洗、转换数据,...
在Map-Reduce作业提交的过程中,客户端会与ResourceManger交互,将作业分解为任务,并分配到各个Worker节点上的Container执行。 接着,我们深入到JobTracker(或ResourceManager)的工作流程。作业提交首先会创建一...
【Yarn工作机制和作业提交流程】是Hadoop生态系统中至关重要的一部分,它负责管理和调度分布式计算资源,确保高效地执行MapReduce等运算程序。Yarn,全称Yet Another Resource Negotiator,是一个资源调度平台,它的...
本文将深入探讨Hadoop的核心组件,包括Configuration、JobClient、JobConf以及JobTracker、TaskTracker等,并详细解析Hadoop作业提交的流程。 首先,我们从Configuration类开始。Configuration类是Hadoop中的基础...
Hadoop作业调度由JobTracker和TaskTracker协同完成。JobTracker运行在master节点,负责整个作业的调度和管理,它会创建Job任务,并将这些任务分配给TaskTracker。TaskTracker运行在集群的每个节点上,负责执行具体的...
例如,Bash Shell环境下的脚本可以使用grep、awk、sed等工具进行数据过滤、格式化和修改,或者调用Hadoop的命令行工具如hadoop fs、hadoop jar等进行数据的读写和MapReduce作业的提交。 具体来说,ETL流程可能包括...
5. **日志查看**:在Eclipse中查看Hadoop作业的运行日志,便于问题定位和性能分析。 6. **版本兼容性**:Hadoop Eclipse Plugin 2.9.0适用于Hadoop 2.x系列,确保与较新版本的Hadoop集群良好兼容。 四、使用步骤 ...
3. Hadoop配置:学习配置Hadoop环境,包括集群设置和作业提交。 4. 数据输入与输出:了解如何将数据输入Hadoop系统以及处理后的结果输出。 5. 错误处理和容错性:理解Hadoop的容错机制,如何处理节点故障等问题。 6....
3. **作业提交**:开发者可以直接在Eclipse中编译、打包和提交MapReduce作业到Hadoop集群,无需手动执行命令行操作,极大地简化了开发流程。 4. **调试支持**:插件提供了强大的调试功能,可以在本地模拟运行...
7. **工具集成**:有许多开源工具可以帮助我们远程提交和管理Hadoop作业,如Hadoop命令行工具、Hadoop的Web UI、Apache Oozie工作流管理系统等。这些工具提供了方便的接口,使开发者能便捷地与集群交互。 8. **安全...
5. 部署和运行:在大规模Hadoop集群上提交作业,处理整个数据集。 6. 结果分析:收集MapReduce输出,可能需要进一步的数据可视化或业务逻辑处理,以生成易于理解的报告。 通过对全国各省市酒店数据的分析与处理,...
二、Hadoop作业流程 Hadoop作业通常包括以下步骤: 1. 数据上传:用户将数据文件上传到HDFS。 2. 作业提交:提交MapReduce程序到JobTracker(Hadoop 1.x中的角色,Hadoop 2.x中由YARN的ResourceManager替代)。 3. ...
### Hadoop.MapReduce 分析 #### 一、概述 Hadoop.MapReduce 是一种分布式计算模型,主要用于处理大规模数据集。其基本思想源自Google提出的MapReduce论文。本文将深入解析Hadoop.MapReduce的工作原理、核心组件...