Job的初始化
1.WordCount.java
public static void main(String[] args) throws Exception {
// 初始化Configuration
Configuration conf = new Configuration();
--> Configuration.java 默认构造函数执行this(true);即
public Configuration(boolean loadDefaults) {
// 设定加载默认设置
this.loadDefaults = loadDefaults;
// 保存最近使用最多的key和value
updatingResource = new HashMap<String, String>();
// 将Configuration放入一个WeakHashMap中
synchronized(Configuration.class) {
REGISTRY.put(this, null);
}
}
...
// 初始化Job
Job job = new Job(conf, "word count");
--> JobConf.java conf作为构造参数传入,将其封装为JobConf
private Credentials credentials = new Credentials(); // 初始化credentials
public JobConf(Configuration conf) {
super(conf);// JobConf继承自Configuration,即将传进来的conf复制到另一个的Configuration中
if (conf instanceof JobConf) { // 该条件不符合
...
}
checkAndWarnDeprecation(); // 在conf中获取已经废弃的key,如果存在则向用户警告使用了过期的参数
}
--> JobContextImpl.java Job的构造方法最终调用父类JobContextImpl的构造方法
public JobContextImpl(Configuration conf, JobID jobId) {
if (conf instanceof JobConf) {
this.conf = (JobConf)conf; // 初始化JobConf
} else {
this.conf = new JobConf(conf);
}
this.jobId = jobId;
this.credentials = this.conf.getCredentials(); // 将JobConf中初始化的credentials赋给JobContextImpl
try {
this.ugi = UserGroupInformation.getCurrentUser(); // 初始化UGI
} catch (IOException e) {
throw new RuntimeException(e);
}
}
job.setJarByClass(WordCount.class);
--> JobConf.java 该方法会最终调用到JobConf的setJarByClass()
public void setJarByClass(Class cls) {
String jar = findContainingJar(cls); // 通过class名字获取到class所在的jar包
if (jar != null) {
setJar(jar); // 调用Configuration的set(JobContext.JAR, jar);方法
}
}
job.setMapperClass(TokenizerMapper.class); // 调用Configuration的setClass();方法
...
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
--> FileInputFormat.java中的addInputPath方法
public static void addInputPath(Job job,
Path path) throws IOException {
Configuration conf = job.getConfiguration();
path = path.getFileSystem(conf).makeQualified(path); // 获取文件系统,并补全路径
String dirStr = StringUtils.escapeString(path.toString());
String dirs = conf.get(INPUT_DIR);
conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
2. Job.java
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit(); // 提交Job
--> Job.java submit()方法
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect(); // 初始化Cluster类,代理所有Job的操作和JobSubmitter的submitInternal操作(加载yarn-site.xml,mapred-site.xml等,最近的用于ClientProtocolProvider加载服务,通过在xml中指定框架的名字。初始化Cluster需要的UGI),该类用于确定当前的job运行在何种模式(Yarn、Local、JobTracker)通过ServiceLoader<ClientProtocolProvider> frameworkLoader加载ClientProtocolProvider,通过ClientProtocolProvider启动运行的框架(Yarn等等)
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
// JobSubmitter:1、检查input和output 2、计算map数 3、设置DistributeCache即用户Map/Reduce程序运行需要的jar包 4、复制job的相关信息到hdfs上 5、提交Job
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
public boolean monitorAndPrintJob()
throws IOException, InterruptedException {
String lastReport = null; // 记录最近一次的report
Job.TaskStatusFilter filter;
Configuration clientConf = getConfiguration();
filter = Job.getTaskOutputFilter(clientConf);
JobID jobId = getJobID();
LOG.info("Running job: " + jobId);
int eventCounter = 0;
boolean profiling = getProfileEnabled();
IntegerRanges mapRanges = getProfileTaskRange(true);
IntegerRanges reduceRanges = getProfileTaskRange(false);
int progMonitorPollIntervalMillis =
Job.getProgressPollInterval(clientConf);
/* make sure to report full progress after the job is done */
boolean reportedAfterCompletion = false;
boolean reportedUberMode = false;
while (!isComplete() || !reportedAfterCompletion) {
if (isComplete()) {
reportedAfterCompletion = true;
} else {
Thread.sleep(progMonitorPollIntervalMillis);
}
if (status.getState() == JobStatus.State.PREP) {
continue;
}
if (!reportedUberMode) {
reportedUberMode = true;
LOG.info("Job " + jobId + " running in uber mode : " + isUber());
}
String report =
(" map " + StringUtils.formatPercent(mapProgress(), 0)+
" reduce " +
StringUtils.formatPercent(reduceProgress(), 0));
if (!report.equals(lastReport)) {
LOG.info(report);
lastReport = report;
}
TaskCompletionEvent[] events =
getTaskCompletionEvents(eventCounter, 10);
eventCounter += events.length;
printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
}
boolean success = isSuccessful();
if (success) {
LOG.info("Job " + jobId + " completed successfully");
} else {
LOG.info("Job " + jobId + " failed with state " + status.getState() +
" due to: " + status.getFailureInfo());
}
Counters counters = getCounters();
if (counters != null) {
LOG.info(counters.toString());
}
return success;
}
分享到:
相关推荐
quartz-job数据库初始化表一些表结构整理
xxl-job是一个很受欢迎的调度工具,但是官网只给了mysql,这提供一下,pgsql的数据库初始化sql
xxl-job-2.2.0版本Oracle建表及初始化语句,建表后需要同步修改Mapper文件,增加oracle驱动依赖,修改数据库连接配置。
【Debug跟踪Hadoop3.0.0源码之MapReduce Job提交流程】第一节 Configuration和Job对象的初始化前言Configuration和Job对象的初始化后记跳转 前言 不得不说,在此前我对阅读源码这件事是拒绝的,一方面也知道自己非读...
拥有文章列表查看(freemarker、CDN、ElasticSearch)、热点文章计算(kafka、xxl-job)、CMS自媒体端文章发布审核(延迟队列)、项目部署数据迁移(Hbase、Jenkins、docker)等功能的头条文章系统的初始化工程maven...
拥有文章列表查看(freemarker、CDN、ElasticSearch)、热点文章计算(kafka、xxl-job)、CMS自媒体端文章发布审核(延迟队列)、项目部署数据迁移(Hbase、Jenkins、docker)等功能的头条文章系统的初始化工程maven...
一、设置初始化参数 job_queue_processes sql> alter system set job_queue_processes=n;(n>0) job_queue_processes最大值为1000 查看job queue 后台进程 sql>select name,description from v$...
xxl-job-2.4.0 postgresql初始化脚本
xxl-job-executor的gin中间件背景xxl-job-executor-go是xxl-job的golang执行器,可以独立运行,有时候我们要与项目或者框架(如:gin框架)...)const Port = "9999"func main() {//初始化执行器exec := xxl.NewExecut
主要介绍了sql server定时作业调用Kettle job出错的快速解决方法,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
GitHub动作,用于安装和初始化以在所有受支持的运行器平台上运行行的实例。 该操作旨在提供一个环境来测试依赖IPFS的DApp。 输入项 此操作将自动检测运行程序平台功能,例如操作系统和处理器体系结构。 ipfs_version...
如此好的分布式任务调度框架弃之可惜,所以就让作者来捡个便宜吧(官方版所有地址均已下线,作者发现原始初始化协议故使用Apache 2.0以示尊重)。 重置版主要在官方原版的基础上优化了以下几点: 任务项状态管理全部...
// 初始化任务队列 JobScheduler scheduler = new JobScheduler("default"); // 向队列中提交任务,每1s打印一行文本 scheduler.addJob("print-hello-world-every-10s", 1000L, new JobMethod() { @Override ...
初始化相关参数job_queue_processes alter system set job_queue_processes=39 scope=spfile;//最大值不能超过1000 ;job_queue_interval = 10 //调度作业刷新频率秒为单位 job_queue_process 表示oracle能够并发的...
工作门户 Job Portal是一个基于MERN Stack的网络应用程序,可帮助简化工作申请流程。 它允许用户选择那里的角色(申请人/招聘者),并创建一个帐户。...初始化网络应用程序的说明: 在机器上安装Node J
作业主管Python SDK Python SDK客户端,用于将HPC作业提交给CyberGIS Job Supervisor基本用法要求... 有几种初始化Session对象接口的方法。 from job_supervisor_client import *# create a new Session objec
init_mysql, 初始化mysql, 返回一个MYSQL *, 一般用来初始化local_mysql query_mysql, 执行一个mysql语句, 格式为query_mysql (local_mysql, "mysql语句, 其中格式和printf的格式相同, 例如delete from %s等", 所...
init_mysql, 初始化mysql, 返回一个MYSQL *, 一般用来初始化local_mysql query_mysql, 执行一个mysql语句, 格式为query_mysql (local_mysql, "mysql语句, 其中格式和printf的格式相同, 例如delete from %s等", 所...
1、运行程序,由检查教师给出文件名,该文件中存有内存目前状况的位示图的数据(0和1的文件)。(程序应做提示,界面友好)。 2、你所编制的程序应读入...//初始化作业表头 p=new jobs; strcpy(p->jobname,"null");
Java版水果管理系统源码 ...jenkins创建统一pipeline类型初始化job(有必要为初始化job新建view。job名称,例:devops-init-jenkinsjob),使用pipeline script from SCM 模式的Git项配置初始化pipeline仓库地址