正常情况下,我们都是启动Hadoop任务的方式大概就是通过hadoop jar命令(或者写在shell中),事实上运行的hadoop就是一个包装的.sh,下面就是其中的最后一行,表示在其中执行一个java命令,调用hadoop的一些主类,同时配置一些hadoop的相关CLASSPATH,OPTS等选项:
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
当使用hadoop jar时,调用的$CLASS是下面的类型:
org.apache.hadoop.util.RunJar
而通过hadoop jar调用的主类,必须满足条件:
1,其中有main方法,类似下面的定义:
public static void main(String[] args) throws Exception { int result = ToolRunner.run(new ThisClass(), args); System.exit(result); }
2. ToolRunner中的的类需要有如下签名:
extends Configured implements Tool
并实现其中的public int run方法,在进行必要的hadoop job构造后,执行job的方法,同步等待执行结果并返回即可。
boolean success = job2.waitForCompletion(true);
大体的过程如下,以前也没有对整个过程进行质疑,直到我们有新的需要,在其他的客户端(java,而不是shell中)启动MapReduce任务,顺带好好看了这个函数waitForCompletion...
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } if (verbose) { monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); }
读完源码后发现,其实这个方法主要的目的就是看一下当前job的状态,如果没有提交,那么就执行submit操作(同步)将其提交到集群上。传递的参数verbose,如果是true,就是表示需要检测并打印job的相关信息(使用LOG.info()来打印到console中);否则,就等待任务的complete,反正这是个同步的操作;我们如果不需要监测任务的执行状态,仅仅进行一步submit就可以了。
那么就看一下monitorAndPrintJob这个函数吧,核心代码如下:
while (!isComplete() || !reportedAfterCompletion) { if (isComplete()) { reportedAfterCompletion = true; } else { Thread.sleep(progMonitorPollIntervalMillis); } if (status.getState() == JobStatus.State.PREP) { continue; } if (!reportedUberMode) { reportedUberMode = true; LOG.info("Job " + jobId + " running in uber mode : " + isUber()); } String report = (" map " + StringUtils.formatPercent(mapProgress(), 0)+ " reduce " + StringUtils.formatPercent(reduceProgress(), 0)); if (!report.equals(lastReport)) { LOG.info(report); lastReport = report; } TaskCompletionEvent[] events = getTaskCompletionEvents(eventCounter, 10); eventCounter += events.length; printTaskEvents(events, filter, profiling, mapRanges, reduceRanges); } boolean success = isSuccessful(); if (success) { LOG.info("Job " + jobId + " completed successfully"); } else { LOG.info("Job " + jobId + " failed with state " + status.getState() + " due to: " + status.getFailureInfo()); } Counters counters = getCounters(); if (counters != null) { LOG.info(counters.toString()); } return success;
其实就是定时循环去报告,检查状态,其中涉及到map和reduce的总体进度(通过某种算法计算出来的百分比),如果报告与上一次有变化,就进行输出。直到任务执行完成,并将其中的所有Counter均打印出来;如果任务失败,打印出任务执行失败的原因。
最终,MapReduce的执行日志大概就是这个样子:
15/04/13 15:01:08 INFO mapreduce.Job: map 96% reduce 28% 15/04/13 15:01:09 INFO mapreduce.Job: map 98% reduce 28% 15/04/13 15:01:10 INFO mapreduce.Job: map 98% reduce 32% 15/04/13 15:01:13 INFO mapreduce.Job: map 100% reduce 33% 15/04/13 15:01:16 INFO mapreduce.Job: map 100% reduce 37% 15/04/13 15:01:19 INFO mapreduce.Job: map 100% reduce 46% 15/04/13 15:01:22 INFO mapreduce.Job: map 100% reduce 54% 15/04/13 15:01:25 INFO mapreduce.Job: map 100% reduce 62% 15/04/13 15:01:28 INFO mapreduce.Job: map 100% reduce 68% 15/04/13 15:01:31 INFO mapreduce.Job: map 100% reduce 71% 15/04/13 15:01:34 INFO mapreduce.Job: map 100% reduce 76% 15/04/13 15:01:35 INFO mapreduce.Job: map 100% reduce 100% 15/04/13 15:01:37 INFO mapreduce.Job: Job job_1421455790417_222365 completed successfully 15/04/13 15:01:37 INFO mapreduce.Job: Counters: 46 File System Counters FILE: Number of bytes read=70894655 FILE: Number of bytes written=158829484 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=5151416348 HDFS: Number of bytes written=78309 HDFS: Number of read operations=1091 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0
如果我们需要将任务执行进度打印出来,就可以对这部分的功能就行改进并重写。
如果任务已经提交到集群,可以使用job对象的getTrackingURL()通过页面的形式查看到其具体详情,其中job对象还提供了一些可以操作集群任务的API,包括killTask, failTask等。
在任务执行完成后,就可以得到任务的所有Counter,使用Counter来对任务的各项指标进行详细统计是非常易用有效的方式,我们在任务中定义了大量的Counter来进行该操作(包括以后以后可能会评估任务的消耗,以便进行费用统计等…)。
如果需要启动多个任务,或以某种依赖的方式启动多个顺序MapReduce任务,可以使用JobControl来链接多个任务,JobControl的run方法,会根据任务的依赖关系来调度整个过程,并提供了一些常用的API,同样可以将任务kill/fail掉。但是如果流程的复杂性稍微比较高的情况下,建议使用一套工作流系统,例如oozie,便于管理以及应对流程上的变化。
相关推荐
(2)打开网站localhost:8088和localhost:50070,查看MapReduce任务启动情况 (3)写wordcount代码并把代码生成jar包 (4)运行命令 (1):把linus下的文件放到hdfs上 (2):运行MapReduce (5):查看运行结果 ...
工作原理即在综合评估应用服务器的资源利用状况的基础上,自适应地分配合理的计算、网络资源,启动并发进程,高效支撑对网络设备的信息采集、数据分析、业务维护等日常运维管理工作。实践表明,该平台已在电信设备...
IDEA SpringBoot集成hadoop运行环境,,本地启动项目,GET请求接口触发远程提交MapReduce任务至生产集群报错: Failing this attempt.Diagnostics: [2020-02-17 00:44:42.444]Exception from container-launch. ...
本书是Hadoop权威参考,程序员可从中探索如何分析海量数据集,管理员可以从中了解如何安装与运行Hadoop集群。 目录 第1章 初识Hadoop 数据!数据! 数据存储与分析 与其他系统相比 关系型数据库管理系统...
使用Hadoop分析数据 map阶段和reduce阶段 横向扩展 合并函数 运行一个分布式的MapReduce作业 Hadoop的Streaming Ruby版本 Python版本 Hadoop Pipes 编译运行 第3章 Hadoop分布式文件系统 HDFS的设计 HDFS的概念 数据...
Hadoop, Apache开源的分布式框架。源自Google GFS,BigTable,MapReduce 论文。 == HDFS == HDFS (Hadoop Distributed File System),Hadoop 分布式文件系统。...TaskTracker,启动和管理Map和Reduce子任务的节点。
数据存储和处理:Hadoop可以存储海量数据,并使用MapReduce等编程模型进行数据处理和分析。 数据挖掘和机器学习:通过Hadoop进行大规模数据处理,可以挖掘出数据中的规律和模式,为机器学习和数据挖掘提供支持。 ...
技术点79 MapReduce 函数、作业和管道的单元测试13.1.3 ...任务的JVM 启动参数13.2.4 高效调试的编码准则技术点84 调试和错误处理13.3 MapReduce 陷阱技术点85 MapReduce 反模式13.4 本章小结...
hadoop-env.sh 环境变量13#Hadoop Common组件 配置 core-site.xml13#HDFS NameNode,DataNode组建配置 hdfs-site.xml14#配置MapReduce - JobTracker TaskTracker 启动配置15#Hadoop单机系统,启动执行和异常检查17#...
YARN配置、启动与验证 YARN配置、启动与验证 序号 任务名称 任务一 YARN组件参数配置 任务二 MapReduce组件参数配置 任务三 配置SSH无密钥登录(slave1为主节点) 任务四 分发YARN与MapReduce配置文件 任务五 启动...
大数据字数 使用 Hadoop Yarn 的大数据分析/大数据科学的简化入门项目。 与 Spring Boot 和 Spring for Hadoop 集成,提供统一... 该项目将根据您的需要在本地启动 Hadoop,然后在运行您的 MapReduce 代码后将其关闭。
5,HDFS文件系统和Mapreduce框架的启动和运行.pdf; 6,运行框架自带的wordcount程序.pdf 适用人群:大数据开发 使用场景:Hadoop学习;HDFS学习;Mapreduce学习; 预期目标:通过学习,能快速完成工作和生产任务,...
这些类库提供⽂件系统和操作系统级别的抽象,包含启动 Hadoop必须的Java⽂件和脚本。 Hadoop YARN:这是⼀个⽤于任务排班和集群资源管理的框架。 Hadoop分布式⽂件系统(HDFS):⼀个分布式⽂件系统,提供⾼吞吐量的...
本篇英文论文通过三个具体...另外,在性能优化方面,Spark也采取了一些Hadoop没有充分考虑的因素,如内存宽带利用率、单位时间内的磁盘读写操作,及任务启动初始化时间等,所以相对于Hadoop,Spark表现出更优异的性能。
以Hadoop带的wordcount为例子(下面是启动行):用户提交一个任务以后,该任务由JobTracker协调,先执行Map阶段(图中M1,M2和M3),然后执行Reduce阶段(图中R1和R2)。Map阶段和Reduce阶段动作都受TaskTracker监控...
技术点83 解决任务的JVM 启动参数 13.2.4 高效调试的编码准则 技术点84 调试和错误处理 13.3 MapReduce 陷阱 技术点85 MapReduce 反模式 13.4 本章小结 附录A 相关技术 附录B Hadoop 内置的数据导入导出...
这里有几个考虑的点,一方面Hadoop MR task的启动及初始化时间较长,如果task过多,可能会导致任务启动和初始化时间远超逻辑处理时间,这种情况白白浪费了计算资源。另一方面,如果任务复杂,task过少又会导致任务...