- 浏览: 951092 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
前言:unscheduleJob针对TriggerKey,而deleteJob针对jobKey下面我们进一步从源码来分析,从job,trriger
调度,再到unscheduleJob,deleteJob
//Job存储的实现
TriggerKey与JobKey包装类
JobKey与JobDetail包装类
//JobDetail
//JobKey
//TriggerKey
//Key
总结:
首先从Scheduler.scheduleJob(JobDetail jobDetail, Trigger trigger) 调度job,
实际上就是将job存储到RAM中的jobsByGroup,jobsByKey对应的Map中, 将trigger存储到
triggers(List),triggersByKey,triggersByGroup对应的Map中,及timeTriggers的Treeset中。
Scheduler.unscheduleJob(TriggerKey triggerKey) 就是将triggerKey从triggersByKey,triggersByGroup,
triggers,timeTriggers中移除;Scheduler.deleteJob(JobKey jobKey)
除了从容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper>,并unscheduleJob
(TriggerKey triggerKey)列表 List<TriggerWrapper>中的所有TriggerWrapper,同时从jobsByKey,jobsByGroup
的移除对应jobKey的相关信息
调度,再到unscheduleJob,deleteJob
public class QuartzScheduler implements RemotableQuartzScheduler { public Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException { validateState(); if(jobDetail == null) throw new SchedulerException("JobDetail cannot be null"); if(trigger == null) throw new SchedulerException("Trigger cannot be null"); if(jobDetail.getKey() == null) throw new SchedulerException("Job's key cannot be null"); if(jobDetail.getJobClass() == null) throw new SchedulerException("Job's class cannot be null"); OperableTrigger trig = (OperableTrigger)trigger; if(trigger.getJobKey() == null) //设置触发器的jobKey trig.setJobKey(jobDetail.getKey()); else if(!trigger.getJobKey().equals(jobDetail.getKey())) throw new SchedulerException("Trigger does not reference given job!"); trig.validate(); Calendar cal = null; if(trigger.getCalendarName() != null) cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName()); Date ft = trig.computeFirstFireTime(cal); if(ft == null) { throw new SchedulerException((new StringBuilder()).append("Based on configured schedule, the given trigger '").append(trigger.getKey()).append("' will never fire.").toString()); } else { //存储jobDetail, trig到RAMJobStore resources.getJobStore().storeJobAndTrigger(jobDetail, trig); //下面三个现在先不说以后再说 notifySchedulerListenersJobAdded(jobDetail); notifySchedulerThread(trigger.getNextFireTime().getTime()); notifySchedulerListenersSchduled(trigger); return ft; } public boolean unscheduleJob(TriggerKey triggerKey) throws SchedulerException { validateState(); //从容器中的RAMJobStore的triggersByKey Map中将triggerKey移除 if(resources.getJobStore().removeTrigger(triggerKey)) { notifySchedulerThread(0L); notifySchedulerListenersUnscheduled(triggerKey); } else { return false; } return true; } public boolean deleteJob(JobKey jobKey) throws SchedulerException { validateState(); boolean result = false; //获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper> List triggers = getTriggersOfJob(jobKey); for(Iterator i$ = triggers.iterator(); i$.hasNext();) { Trigger trigger = (Trigger)i$.next(); //移除触发器 if(!unscheduleJob(trigger.getKey())) { StringBuilder sb = (new StringBuilder()).append("Unable to unschedule trigger [").append(trigger.getKey()).append("] while deleting job [").append(jobKey).append("]"); throw new SchedulerException(sb.toString()); } result = true; } //从容器中的RAMJobStore的triggersByKey,trriger, //triggersByGroup,timeTriggers,中将jobKey对应的TrrigerWapper移除 //同时,从jobsByKey,jobsByGroup的移除对应jobKey的JobWrapper相关信息 result = resources.getJobStore().removeJob(jobKey) || result; if(result) { notifySchedulerThread(0L); notifySchedulerListenersJobDeleted(jobKey); } return result; } private static String VERSION_MAJOR; private static String VERSION_MINOR; private static String VERSION_ITERATION; private QuartzSchedulerResources resources; private QuartzSchedulerThread schedThread; private ThreadGroup threadGroup; private SchedulerContext context; private ListenerManager listenerManager; private HashMap internalJobListeners; private HashMap internalTriggerListeners; private ArrayList internalSchedulerListeners; private JobFactory jobFactory; ExecutingJobsManager jobMgr; ErrorLogger errLogger; private SchedulerSignaler signaler; private Random random; private ArrayList holdToPreventGC; private boolean signalOnSchedulingChange; private volatile boolean closed; private volatile boolean shuttingDown; private boolean boundRemotely; private QuartzSchedulerMBean jmxBean; private Date initialStart; private final Timer updateTimer; private final Logger log = LoggerFactory.getLogger(getClass()); static { Properties props; InputStream is; VERSION_MAJOR = "UNKNOWN"; VERSION_MINOR = "UNKNOWN"; VERSION_ITERATION = "UNKNOWN"; props = new Properties(); is = null; is = org/quartz/core/QuartzScheduler.getResourceAsStream("quartz-build.properties"); if(is != null) { props.load(is); String version = props.getProperty("version"); if(version != null) { String versionComponents[] = version.split("\\."); VERSION_MAJOR = versionComponents[0]; VERSION_MINOR = versionComponents[1]; if(versionComponents.length > 2) VERSION_ITERATION = versionComponents[2]; else VERSION_ITERATION = "0"; } else { LoggerFactory.getLogger(org/quartz/core/QuartzScheduler).error("Can't parse Quartz version from quartz-build.properties"); } } if(is != null) try { is.close(); } catch(Exception ignore) { } break MISSING_BLOCK_LABEL_181; Exception e; e; LoggerFactory.getLogger(org/quartz/core/QuartzScheduler).error("Error loading version info from quartz-build.properties.", e); if(is != null) try { is.close(); } catch(Exception ignore) { } break MISSING_BLOCK_LABEL_181; Exception exception; exception; if(is != null) try { is.close(); } catch(Exception ignore) { } throw exception; } }
//Job存储的实现
public class RAMJobStore implements JobStore { //获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper> public List getTriggersForJob(JobKey jobKey) { ArrayList trigList = new ArrayList(); synchronized(lock) { //从triggers中获取jobKey对应的TriggerWrapper Iterator i$ = triggers.iterator(); do { if(!i$.hasNext()) break; TriggerWrapper tw = (TriggerWrapper)i$.next(); if(tw.jobKey.equals(jobKey)) trigList.add((OperableTrigger)tw.trigger.clone()); } while(true); } return trigList; } public void storeJobAndTrigger(JobDetail newJob, OperableTrigger newTrigger) throws JobPersistenceException { storeJob(newJob, false); storeTrigger(newTrigger, false); } //存储newJob到jobsByKey,jobsByGroup public void storeJob(JobDetail newJob, boolean replaceExisting) throws ObjectAlreadyExistsException { JobWrapper jw = new JobWrapper((JobDetail)newJob.clone()); boolean repl = false; synchronized(lock) { if(jobsByKey.get(jw.key) != null) { if(!replaceExisting) throw new ObjectAlreadyExistsException(newJob); repl = true; } if(!repl) { HashMap grpMap = (HashMap)jobsByGroup.get(newJob.getKey().getGroup()); if(grpMap == null) { grpMap = new HashMap(100); jobsByGroup.put(newJob.getKey().getGroup(), grpMap); } //存储job到jobsByGroup grpMap.put(newJob.getKey(), jw); //存储job到jobsByKey jobsByKey.put(jw.key, jw); } else { JobWrapper orig = (JobWrapper)jobsByKey.get(jw.key); orig.jobDetail = jw.jobDetail; } } } //将newTrigger存储到triggersByKey,trriger,triggersByGroup,timeTriggers public void storeTrigger(OperableTrigger newTrigger, boolean replaceExisting) throws JobPersistenceException { TriggerWrapper tw = new TriggerWrapper((OperableTrigger)newTrigger.clone()); synchronized(lock) { if(triggersByKey.get(tw.key) != null) { if(!replaceExisting) throw new ObjectAlreadyExistsException(newTrigger); removeTrigger(newTrigger.getKey(), false); } if(retrieveJob(newTrigger.getJobKey()) == null) throw new JobPersistenceException((new StringBuilder()).append("The job (").append(newTrigger.getJobKey()).append(") referenced by the trigger does not exist.").toString()); //存储trriger到triggers triggers.add(tw); HashMap grpMap = (HashMap)triggersByGroup.get(newTrigger.getKey().getGroup()); if(grpMap == null) { grpMap = new HashMap(100); triggersByGroup.put(newTrigger.getKey().getGroup(), grpMap); } //存储trriger到triggersByGroup grpMap.put(newTrigger.getKey(), tw); //存储trriger到triggersByKey triggersByKey.put(tw.key, tw); if(pausedTriggerGroups.contains(newTrigger.getKey().getGroup()) || pausedJobGroups.contains(newTrigger.getJobKey().getGroup())) { tw.state = 4; if(blockedJobs.contains(tw.jobKey)) tw.state = 6; } else if(blockedJobs.contains(tw.jobKey)) tw.state = 5; else // 将trriger添加到timeTriggers timeTriggers.add(tw); } } public boolean removeTrigger(TriggerKey triggerKey) { return removeTrigger(triggerKey, true); } //从triggersByKey,triggers,timeTriggers移除key private boolean removeTrigger(TriggerKey key, boolean removeOrphanedJob) { boolean found; synchronized(lock) { //从triggersByKey移除 found = triggersByKey.remove(key) != null; if(found) { TriggerWrapper tw = null; HashMap grpMap = (HashMap)triggersByGroup.get(key.getGroup()); if(grpMap != null) { ////从triggersByGroup移除 grpMap.remove(key); if(grpMap.size() == 0) triggersByGroup.remove(key.getGroup()); } Iterator tgs = triggers.iterator(); do { if(!tgs.hasNext()) break; tw = (TriggerWrapper)tgs.next(); if(!key.equals(tw.key)) continue; //从triggers移除 tgs.remove(); break; } while(true); //从timeTriggers移除 timeTriggers.remove(tw); if(removeOrphanedJob) { JobWrapper jw = (JobWrapper)jobsByKey.get(tw.jobKey); List trigs = getTriggersForJob(tw.jobKey); if((trigs == null || trigs.size() == 0) && !jw.jobDetail.isDurable() && removeJob(jw.key)) signaler.notifySchedulerListenersJobDeleted(jw.key); } } } return found; } //从容器中的RAMJobStore的triggersByKey,trriger, //triggersByGroup,timeTriggers,中将jobKey对应的TrrigerWapper移除 //同时,从jobsByKey,jobsByGroup的移除对应jobKey的JobWrapper相关信息 public boolean removeJob(JobKey jobKey) { boolean found = false; synchronized(lock) { //获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper> List triggersOfJob = getTriggersForJob(jobKey); for(Iterator i$ = triggersOfJob.iterator(); i$.hasNext();) { OperableTrigger trig = (OperableTrigger)i$.next(); //从triggersByKey,triggers,timeTriggers移除key removeTrigger(trig.getKey()); found = true; } //从jobsByKey移除jobKey found = (jobsByKey.remove(jobKey) != null) | found; if(found) { HashMap grpMap = (HashMap)jobsByGroup.get(jobKey.getGroup()); if(grpMap != null) { //从jobsByGroup对应的group总移除jobKey grpMap.remove(jobKey); if(grpMap.size() == 0) jobsByGroup.remove(jobKey.getGroup()); } } } return found; } protected HashMap jobsByKey;//HashMap<JobKey,JobWrapper> protected HashMap triggersByKey;//HashMap<TrrigerKey,TriggerWrapper> protected HashMap jobsByGroup;//HashMap<String,HashMap<JobKey,JobWrapper>>,,key为JobKey.group protected HashMap triggersByGroup;//HashMap<String,HashMap<TrrigerKey,TriggerWrapper>>,,key为TrrigerKey.group protected TreeSet timeTriggers;//TreeSet<TrrigerWrapper> 红黑树 protected HashMap calendarsByName; protected ArrayList triggers; //List<TriggerWrapper> protected final Object lock = new Object(); protected HashSet pausedTriggerGroups; protected HashSet pausedJobGroups; protected HashSet blockedJobs; protected long misfireThreshold; protected SchedulerSignaler signaler; private final Logger log = LoggerFactory.getLogger(getClass()); private static final AtomicLong ftrCtr = new AtomicLong(System.currentTimeMillis()); }
TriggerKey与JobKey包装类
class TriggerWrapper { TriggerWrapper(OperableTrigger trigger) { state = 0; if(trigger == null) { throw new IllegalArgumentException("Trigger cannot be null!"); } else { this.trigger = trigger; key = trigger.getKey(); jobKey = trigger.getJobKey(); return; } } public boolean equals(Object obj) { if(obj instanceof TriggerWrapper) { TriggerWrapper tw = (TriggerWrapper)obj; if(tw.key.equals(key)) return true; } return false; } public int hashCode() { return key.hashCode(); } public OperableTrigger getTrigger() { return trigger; } public final TriggerKey key; public final JobKey jobKey; public final OperableTrigger trigger; public int state; public static final int STATE_WAITING = 0; public static final int STATE_ACQUIRED = 1; public static final int STATE_EXECUTING = 2; public static final int STATE_COMPLETE = 3; public static final int STATE_PAUSED = 4; public static final int STATE_BLOCKED = 5; public static final int STATE_PAUSED_BLOCKED = 6; public static final int STATE_ERROR = 7; }
JobKey与JobDetail包装类
class JobWrapper { JobWrapper(JobDetail jobDetail) { this.jobDetail = jobDetail; key = jobDetail.getKey(); } public boolean equals(Object obj) { if(obj instanceof JobWrapper) { JobWrapper jw = (JobWrapper)obj; if(jw.key.equals(key)) return true; } return false; } public int hashCode() { return key.hashCode(); } public JobKey key; public JobDetail jobDetail; }
//JobDetail
public class JobDetailImpl implements Cloneable, Serializable, JobDetail { private static final long serialVersionUID = -6069784757781506897L; private String name; private String group; private String description; private Class jobClass; private JobDataMap jobDataMap;//拥有共享数据,就是一个Map private boolean durability; private boolean shouldRecover; private transient JobKey key; }
//JobKey
public final class JobKey extends Key { public JobKey(String name) { super(name, null); } public JobKey(String name, String group) { super(name, group); } public static JobKey jobKey(String name) { return new JobKey(name, null); } public static JobKey jobKey(String name, String group) { return new JobKey(name, group); } private static final long serialVersionUID = -6073883950062574010L; }
//TriggerKey
public final class TriggerKey extends Key { public TriggerKey(String name) { super(name, null); } public TriggerKey(String name, String group) { super(name, group); } public static TriggerKey triggerKey(String name) { return new TriggerKey(name, null); } public static TriggerKey triggerKey(String name, String group) { return new TriggerKey(name, group); } private static final long serialVersionUID = 8070357886703449660L; }
//Key
package org.quartz.utils; import java.io.Serializable; import java.util.UUID; public class Key implements Serializable, Comparable { public Key(String name, String group) { if(name == null) throw new IllegalArgumentException("Name cannot be null."); this.name = name; if(group != null) this.group = group; else this.group = "DEFAULT"; } public String getName() { return name; } public String getGroup() { return group; } private static final long serialVersionUID = -7141167957642391350L; public static final String DEFAULT_GROUP = "DEFAULT"; private final String name; private final String group; }
总结:
首先从Scheduler.scheduleJob(JobDetail jobDetail, Trigger trigger) 调度job,
实际上就是将job存储到RAM中的jobsByGroup,jobsByKey对应的Map中, 将trigger存储到
triggers(List),triggersByKey,triggersByGroup对应的Map中,及timeTriggers的Treeset中。
Scheduler.unscheduleJob(TriggerKey triggerKey) 就是将triggerKey从triggersByKey,triggersByGroup,
triggers,timeTriggers中移除;Scheduler.deleteJob(JobKey jobKey)
除了从容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper>,并unscheduleJob
(TriggerKey triggerKey)列表 List<TriggerWrapper>中的所有TriggerWrapper,同时从jobsByKey,jobsByGroup
的移除对应jobKey的相关信息
发表评论
-
TreeSet在Quartz任务调度过程中的作用
2017-08-24 23:43 681红黑树详解:http://www.cnblogs.com/sk ... -
Quartz使用与Spring集成系列教程
2016-10-26 09:48 458Quartz的使用:http://donald-draper. ... -
Spring与Quartz集成-源码分析
2016-09-13 11:50 2636在阅读以下文章之前,如果对Quartz任务调度不是很熟悉,请看 ... -
Spring与Quartz集成详解
2016-09-09 17:52 2761首先这个所有的依赖包就不需要多讲了,首先下载Quazrt发布包 ... -
Quartz 任务存储JobStoreTX 持久化之RDB-源码分析
2016-09-08 18:24 2217Quartz 任务存储JobStoreTX 持久化之RDB:h ... -
Quartz 任务存储JobStoreTX 持久化之RDB
2016-09-08 11:52 2708Quartz储存方式之JDBC JobStoreTX:http ... -
Quartz任务调度源码分析
2016-09-07 13:12 3508Quartz的使用:http://donald-draper. ... -
Quartz的job调度源码分析
2016-09-06 13:03 8Quartz的使用:http://donald ... -
Quartzs的job,trriger监听器源码分析
2016-09-06 11:15 1439Quartz的使用:http://donald-draper. ... -
Quartz的job、触发器的暂停与恢复源码分析
2016-09-06 09:01 5087Quartz的使用:http://donald-draper. ... -
Quartz的Scheduler初始化源码分析
2016-09-05 16:14 3509Quartz的使用:http://donald ... -
Quartz的使用
2016-09-05 11:39 1744Quartz使用总结:http://www.cnblogs.c ...
相关推荐
java代码 job_触发器例子
oracle job调度存储过程 触发器 定时更新数据库
python基于51job数据可视化图表展示源码,基于51job工作招聘数据可视化图表展示 web 数据挖掘,ECharts可视化。python基于51job数据可视化图表展示源码,基于51job工作招聘数据可视化图表展示 web 数据挖掘,ECharts...
oracle中job调度存储过程 触发器 定时更新数据库.rar
xxl job是一个轻量级的任务调度平台,该文档是对其做了一个源码分析。
查找被锁表,查看表空间大小,禁用的触发器,启用的触发器
xxl-job-admin 作为调度中心,有可视化的web管理界面,可将其作为一个模块以源码的方式引入到我们的项目中,我们可对其进行自定义的修改,但一般不需要;xxl-job-core 是 XXL-JOB的核心,绝大多数情况下我们不用对其...
xxl-job源码: 包含:xxl-job-admin、xxl-job-core、xxl-job-executor
xxl-job任务调度中心
XXL-JOB分布式任务调度系统培训PPT
基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job...
51job求职招聘平台网站模板源码 源码描述: 一、源码特点 1、51job求职招聘平台网站模板,橙色风格,全套模板,包括首页、会员验证、公司招聘、简历管理、登录、注册、搜索等多个网站模板页面。 二、菜单功能 1、...
28Job-V1.0源码 此源码为网上招聘求职系统。类似 51Job ,源码提供学习使用 用户有两种登陆方式,一个是个人用户,登陆后,可以填写简历。 一个是企业用户,登陆后,有自己的管理平台,管理自己的企业资料和收到...
定时执行存储过程,创建job、删除job等等
关联我的博客XXL-JOB快速入门的源码,需要的朋友可自行下载
elastic-job-1.0.5相关源码
学习Hadoop源码过程中做的源码分析,共享一下,PPT中有我的邮箱,可以互相探讨。Hadoop源码分析(client端提交job到rm端)
触发器在数据库里以独立的对象存储,它与存储过程不同的是,存储过程通过其它程序来启动运行或直接启动运行,而触发器是由一个事件来启动运行。即触发器是当某个事件发生时自动地隐式运行。并且,触发器不能接收...
当当elastic-job控制台jar包,elastic-job-lite-console-3.0.0.M1-SNAPSHOT,本人从git下载源码后编译生成的jar。 $ 解压 elastic-job-lite-console-3.0.0.M1-SNAPSHOT.rar $ cd elastic-job-lite-console-3.0.0.M1-...