- 浏览: 203808 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
Prepared:
Hadoop的几个明显缺点 -
CSunDNan:
...
openjdk jvm 方法字节码执行过程 -
幻影之蚀:
...
mysql 源码分析2 源码调试环境建立 -
shukongchengje:
紧急呼唤楼主,mysql代码从哪里弄?官网wiki上看的一头雾 ...
mysql源码分析 整体架构 -
yeshaoting:
好文章.不介意的话转载了.
jvm 字节码中文含义
Quartz是运用最广的任务调度框架,它最核心的组成部分是Scheduler、Trigger、JobDetail,然后给Scheduler配置个线程QuartzSchedulerThread,此线程在Scheduler初始化时启动,等待Scheduler start,然后从JobStore里拿到最近要触发的Trigger,以线程等待的方式等到trigger触发时间点,之后就是执行trigger所关联的JobDetail,最后清扫战场。Scheduler初始化、start和trigger执行的时序图如下所示:
其中,最核心的地方是QuartzSchedulerThread运行机制。下面解析一下它的run方法:
view plaincopy to clipboardprint?
public void run() {
boolean lastAcquireFailed = false;
while (!halted) {
try {
// check if we're supposed to pause...
synchronized (pauseLock) {
while (paused && !halted) {
try {
// wait until togglePause(false) is called...
pauseLock.wait(100L);
} catch (InterruptedException ignore) {
}
}
if (halted) {
break;
}
}
......
}
}
public void run() {
boolean lastAcquireFailed = false;
while (!halted) {
try {
// check if we're supposed to pause...
synchronized (pauseLock) {
while (paused && !halted) {
try {
// wait until togglePause(false) is called...
pauseLock.wait(100L);
} catch (InterruptedException ignore) {
}
}
if (halted) {
break;
}
}
......
}
}
以上是run的最开头的一段,不难看出这是在等待scheduler的start,实际上Quartz就是通过线程的wait或sleep来实现时间调度。继续看代码:
view plaincopy to clipboardprint?
Trigger trigger = null;
long now = System.currentTimeMillis();
signaled = false;
try {
trigger = qsRsrcs.getJobStore().acquireNextTrigger(
ctxt, now + idleWaitTime);
lastAcquireFailed = false;
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occured while scanning for the next trigger to fire.",
jpe);
}
lastAcquireFailed = true;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
}
Trigger trigger = null;
long now = System.currentTimeMillis();
signaled = false;
try {
trigger = qsRsrcs.getJobStore().acquireNextTrigger(
ctxt, now + idleWaitTime);
lastAcquireFailed = false;
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occured while scanning for the next trigger to fire.",
jpe);
}
lastAcquireFailed = true;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
}
这段代码是从jobStore里拿到下一个要执行的trigger,一般情况下jobStore使用的是RAMJobStore,即trigger等相关信息存放在内存里,如果需要把任务持久化就得使用可持久化JobStore。继续看代码:
view plaincopy to clipboardprint?
now = System.currentTimeMillis();
long triggerTime = trigger.getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
long spinInterval = 10;
int numPauses = (int) (timeUntilTrigger / spinInterval);
while (numPauses >= 0 && !signaled) {
try {
Thread.sleep(spinInterval);
} catch (InterruptedException ignore) {
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
numPauses = (int) (timeUntilTrigger / spinInterval);
}
if (signaled) {
try {
qsRsrcs.getJobStore().releaseAcquiredTrigger(
ctxt, trigger);
} catch (JobPersistenceException jpe) {
qs.notifySchedulerListenersError(
"An error occured while releasing trigger '"
+ trigger.getFullName() + "'",
jpe);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
} catch (RuntimeException e) {
getLog().error(
"releaseTriggerRetryLoop: RuntimeException "
+e.getMessage(), e);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
}
signaled = false;
continue;
}
now = System.currentTimeMillis();
long triggerTime = trigger.getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
long spinInterval = 10;
int numPauses = (int) (timeUntilTrigger / spinInterval);
while (numPauses >= 0 && !signaled) {
try {
Thread.sleep(spinInterval);
} catch (InterruptedException ignore) {
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
numPauses = (int) (timeUntilTrigger / spinInterval);
}
if (signaled) {
try {
qsRsrcs.getJobStore().releaseAcquiredTrigger(
ctxt, trigger);
} catch (JobPersistenceException jpe) {
qs.notifySchedulerListenersError(
"An error occured while releasing trigger '"
+ trigger.getFullName() + "'",
jpe);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
} catch (RuntimeException e) {
getLog().error(
"releaseTriggerRetryLoop: RuntimeException "
+e.getMessage(), e);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
}
signaled = false;
continue;
}
此段代码是计算下一个trigger的执行时间和现在系统时间的差,然后通过循环线程sleep的方式暂停住此线程,一直等到trigger的执行时间点。继续看代码:
view plaincopy to clipboardprint?
import org.quartz.core.JobRunShell;
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
shell.initialize(qs, bndle);
} catch (SchedulerException se) {
try {
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
errorTriggerRetryLoop(bndle);
}
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
try {
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
releaseTriggerRetryLoop(trigger);
}
}
import org.quartz.core.JobRunShell;
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
shell.initialize(qs, bndle);
} catch (SchedulerException se) {
try {
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
errorTriggerRetryLoop(bndle);
}
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
try {
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
releaseTriggerRetryLoop(trigger);
}
}
此段代码就是包装trigger,然后通过以JobRunShell为载体,在threadpool里执行trigger所关联的jobDetail。
之后的代码就是清扫战场,就不在累述。
本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/cutesource/archive/2009/12/08/4965520.aspx
其中,最核心的地方是QuartzSchedulerThread运行机制。下面解析一下它的run方法:
view plaincopy to clipboardprint?
public void run() {
boolean lastAcquireFailed = false;
while (!halted) {
try {
// check if we're supposed to pause...
synchronized (pauseLock) {
while (paused && !halted) {
try {
// wait until togglePause(false) is called...
pauseLock.wait(100L);
} catch (InterruptedException ignore) {
}
}
if (halted) {
break;
}
}
......
}
}
public void run() {
boolean lastAcquireFailed = false;
while (!halted) {
try {
// check if we're supposed to pause...
synchronized (pauseLock) {
while (paused && !halted) {
try {
// wait until togglePause(false) is called...
pauseLock.wait(100L);
} catch (InterruptedException ignore) {
}
}
if (halted) {
break;
}
}
......
}
}
以上是run的最开头的一段,不难看出这是在等待scheduler的start,实际上Quartz就是通过线程的wait或sleep来实现时间调度。继续看代码:
view plaincopy to clipboardprint?
Trigger trigger = null;
long now = System.currentTimeMillis();
signaled = false;
try {
trigger = qsRsrcs.getJobStore().acquireNextTrigger(
ctxt, now + idleWaitTime);
lastAcquireFailed = false;
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occured while scanning for the next trigger to fire.",
jpe);
}
lastAcquireFailed = true;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
}
Trigger trigger = null;
long now = System.currentTimeMillis();
signaled = false;
try {
trigger = qsRsrcs.getJobStore().acquireNextTrigger(
ctxt, now + idleWaitTime);
lastAcquireFailed = false;
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occured while scanning for the next trigger to fire.",
jpe);
}
lastAcquireFailed = true;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
}
这段代码是从jobStore里拿到下一个要执行的trigger,一般情况下jobStore使用的是RAMJobStore,即trigger等相关信息存放在内存里,如果需要把任务持久化就得使用可持久化JobStore。继续看代码:
view plaincopy to clipboardprint?
now = System.currentTimeMillis();
long triggerTime = trigger.getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
long spinInterval = 10;
int numPauses = (int) (timeUntilTrigger / spinInterval);
while (numPauses >= 0 && !signaled) {
try {
Thread.sleep(spinInterval);
} catch (InterruptedException ignore) {
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
numPauses = (int) (timeUntilTrigger / spinInterval);
}
if (signaled) {
try {
qsRsrcs.getJobStore().releaseAcquiredTrigger(
ctxt, trigger);
} catch (JobPersistenceException jpe) {
qs.notifySchedulerListenersError(
"An error occured while releasing trigger '"
+ trigger.getFullName() + "'",
jpe);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
} catch (RuntimeException e) {
getLog().error(
"releaseTriggerRetryLoop: RuntimeException "
+e.getMessage(), e);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
}
signaled = false;
continue;
}
now = System.currentTimeMillis();
long triggerTime = trigger.getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
long spinInterval = 10;
int numPauses = (int) (timeUntilTrigger / spinInterval);
while (numPauses >= 0 && !signaled) {
try {
Thread.sleep(spinInterval);
} catch (InterruptedException ignore) {
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
numPauses = (int) (timeUntilTrigger / spinInterval);
}
if (signaled) {
try {
qsRsrcs.getJobStore().releaseAcquiredTrigger(
ctxt, trigger);
} catch (JobPersistenceException jpe) {
qs.notifySchedulerListenersError(
"An error occured while releasing trigger '"
+ trigger.getFullName() + "'",
jpe);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
} catch (RuntimeException e) {
getLog().error(
"releaseTriggerRetryLoop: RuntimeException "
+e.getMessage(), e);
// db connection must have failed... keep
// retrying until it's up...
releaseTriggerRetryLoop(trigger);
}
signaled = false;
continue;
}
此段代码是计算下一个trigger的执行时间和现在系统时间的差,然后通过循环线程sleep的方式暂停住此线程,一直等到trigger的执行时间点。继续看代码:
view plaincopy to clipboardprint?
import org.quartz.core.JobRunShell;
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
shell.initialize(qs, bndle);
} catch (SchedulerException se) {
try {
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
errorTriggerRetryLoop(bndle);
}
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
try {
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
releaseTriggerRetryLoop(trigger);
}
}
import org.quartz.core.JobRunShell;
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
shell.initialize(qs, bndle);
} catch (SchedulerException se) {
try {
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
errorTriggerRetryLoop(bndle);
}
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
try {
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
} catch (SchedulerException se2) {
qs.notifySchedulerListenersError(
"An error occured while placing job's triggers in error state '"
+ trigger.getFullName() + "'", se2);
// db connection must have failed... keep retrying
// until it's up...
releaseTriggerRetryLoop(trigger);
}
}
此段代码就是包装trigger,然后通过以JobRunShell为载体,在threadpool里执行trigger所关联的jobDetail。
之后的代码就是清扫战场,就不在累述。
本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/cutesource/archive/2009/12/08/4965520.aspx
发表评论
-
fuse-2.7.3.tar.gz开源代码学习心得
2010-03-30 14:06 2536fuse-2.7.3.tar.gz开源代码 ... -
Linux网络文件系统
2010-03-30 13:25 1945Linux网络文件系统 (NFS) ... -
fuse调用流程分析
2010-03-29 15:30 1980fuse处理请求的整个流程如下图所示,以unlink操作为例进 ... -
基于MySQL的数据库集群系统的实现
2010-03-29 15:27 826您的WebApp系统是否正在 ... -
gearman 源码分析
2010-03-29 15:23 1950gearman 源码分析 -
lustre 基于对象存储的分布式实现
2010-03-29 15:20 1124lustre 基于对象存储的分布式实现 -
揭开j2ee集群面纱
2010-03-29 15:09 682揭开j2ee集群面纱 -
gearman 分布式图片转化处理框架
2010-03-29 15:07 867gearman 分布式图片转化处理框架 -
lustre文件架构 dht
2010-03-29 15:03 908lustre文件架构 dht ib总线 -
Terracotta/Quartz集成带来了基于内存集群的分布式任务调度功能
2010-03-18 13:21 1247Terracotta/Quartz集成带来了基于内存集群的分布 ... -
Lustre File System (转)
2010-03-16 17:03 1652Lustre File System 历史 Lustr ... -
Hadoop的几个明显缺点
2010-03-16 16:56 15016Hadoop的几个明显缺点如下: 1. 采用Java实现。Ja ... -
HDFS是一个不错的分布式文件系统
2010-03-16 16:55 1229HDFS是一个不错的分布式文件系统,它有很多的优点,但也存在有 ... -
Lustre系统的体系结构
2010-03-16 16:51 1081Lustre的主要组件有三个:先进的集群文件系统,基于对象的存 ... -
分布式文件系统 linux lustre
2010-03-16 16:48 2185Lustre名字是由Linux和Clusters演化而来,是为 ... -
gluster分析(转)
2010-03-16 16:45 4022引言 GlusterFS 是一个高层次的分布式文件系统解决方案 ... -
分布式文件系统 gluster
2010-03-16 16:44 1119分布式文件系统 gluster -
分布式锁资料
2010-03-16 16:44 841分布式锁服务 -
分布式调度框架 quartz
2010-03-16 16:39 1460分布式调度框架 quartz -
分布式调度 gearman(转)
2010-03-16 16:38 1399学学Gearman2009年07月11日 ...
相关推荐
quartz集群调度机制调研及源码分析,基于quartz 1.7版本
.net 简单任务调度平台,用于.net dll,exe的任务的挂载,任务的隔离,调度执行,访问权限控制,监控,管理,日志,错误预警,性能分析等
本repo是对quartz例程进行的一些说明和注解,长期水平,不足之处请多指教。... 石英教程系列博文之前发表在,现根据最新版本2.4.0重新校对和整理后,发布在上,后续会根据源码对一些核心组件进行分析。 石英使用教程
java8 源码 学习demo demo-01-springmvc 手写springmvc实现部分功能 demo-02-mytimer 学习java定时任务调度工具Timer ...源码分析 demo-10-jvm-lecture jvm 学习 demo-11-rpc 实现rpc demo-12-concurrency 并发面试
同时此项目配备了完善的开发文档( 60+ 页 ),涵盖了整个系统的需求分析、功能分析、系统设计、数据库设计、系统模块设计和系统实现等内容,可以为项目的学习者或使用者提供很好的辅助作用。 技术栈: Spring + ...
基于Java(SpringBoot) + Vue(Element UI) + UniApp开发的一套新零售移动电商系统,CRMEB系统就是集客户关系管理 + 营销电商系统,能够快速积累客户、会员数据分析、智能转化客户、有效提高销售、会员维护、网络营销...
第13章:本章重点对在Spring中如何使用Quartz进行任务调度进行了讲解,同时还涉及了使用JDK Timer和JDK 5.0执行器的知识。 第14章:介绍Spring 3.0新增的OXM模块,同时对XML技术进行了整体的了解。 第15章:对...
FlyCms比论坛更开放,比资讯网站更了解用户,以用户为中心,更懂社交,更注重内容的分析和分享。 ##### 用戶互助,精准推送 用户之间相互解答,分享产品使用心得,只向用户推送自己关心的消息,扩大价值,压缩...
Previus版本是封闭源代码,并使用Vulkan后端编写了Zig,并计划为具有平铺(类似Yabai)转换模式的Quartz形式(macOS Compositor)。 实际版本计划完全不同。 我的目标是使用X11的AwesomeWM风格的具声明性lua配置的...
- 统一下单(统一下单接口、统一扫码)、订单管理、数据分析、财务报表、商户管理、渠道管理、对账系统、系统监控。 ![统一扫码支付](project-bootstrap/zheng-pay.png) > zheng-ucenter 通用用户管理系统, 实现...
木兰湾管理系统是用于管理个人消费、锻炼、音乐、阅读、健康、饮食、人生经历等各个衣食住行信息的系统,通过提醒、计划模块利用调度系统来统计分析执行情况,并通过积分和评分体系来综合评估个人的总体状态。...
用于.net dll,exe的任务的挂载,任务的隔离,调度执行,访问权限控制,监控,管理,日志,错误预警,性能分析等。 平台基于quartz.net进行任务调度功能开发,采用C#代码编写,支持corn表达和第三方自定义的corn...
Java并发源码分析 - ThreadPoolExecutor ] () [ java ClassLoader 基本原理 ] () [ 解决eclipse显示jar源代码中文乱码问题 ] () [ 使用 RMI + ZooKeeper 实现远程调用框架 ] () [ Java 注解指导手册 – 终极向导 ] ...
Spring是目前各个企业必备...7.Sping核心源码分析 8.Spring整合Quartz等常见组件 ? ? 建议大家在学习本课程时,除了看视频以外,一定要将视频中的案例和代码等亲自动手敲两遍以上,并且动手做总结,从而对知识的理解。
陈开雄 Spring+3.x企业应用开发实战光盘源码 !!!!压缩包的jar包太多,太大无法上传,请谅解,需要的可以联系我 QQ:349721489 第1章:对Spring框架进行宏观性的概述,力图使读者建立起对Spring整体性的认识。 ...
java源码 项目说明 financePub是一个轻量级的,前后端分离的Java快速开发平台,在此平台上开发了收支记账管理软件; 支持MySQL、Oracle、SQL Server、PostgreSQL等主流数据库 前端地址: 具有如下特点 收入管理,对...
sdk源码 服务器推荐配置 项目说明 platform-plus是一个轻量级的,前后端分离的Java快速开发平台 优势 严格遵循阿里编码规约开发,便于阅读及二次开发 支持 MySQL、MariaDB、Oracle、DB2、H2、HSQL、SQLite、Postgre...
lenos(p为spring boot 2.0 版本扩展名)一款快速开发模块化脚手架,采用spring boot 2.0.1、spring、SpringMvc、mybatis、shiro、activiti工作流、swagger、ehcache、quartz、freemark…. 各领域数据集,工具源码,...
Spring Batch Admin 是一个后端采用spring boot 2, spring security , oauth2, Spring data jpa 作为基础框架,集成了quartz 提供调度能力,集成了Spring batch 提供批处理能力的管理系统。系统旨… 各领域数据集,...
连接池监视:监视当期系统数据库连接池状态,可进行分析SQL找出系统性能瓶颈。 数据库建模:在线创建数据库表及字段,并可进行可视化配置各字段前端显示组件,表单即可显示对应组件。 定时任务计划:后台可视化配置...