`
Donald_Draper
  • 浏览: 951092 次
社区版块
存档分类
最新评论

Quartzs的job存储,触发器、job删除源码分析

阅读更多
前言:unscheduleJob针对TriggerKey,而deleteJob针对jobKey下面我们进一步从源码来分析,从job,trriger
调度,再到unscheduleJob,deleteJob
public class QuartzScheduler
    implements RemotableQuartzScheduler
{
public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
        throws SchedulerException
    {
        validateState();
        if(jobDetail == null)
            throw new SchedulerException("JobDetail cannot be null");
        if(trigger == null)
            throw new SchedulerException("Trigger cannot be null");
        if(jobDetail.getKey() == null)
            throw new SchedulerException("Job's key cannot be null");
        if(jobDetail.getJobClass() == null)
            throw new SchedulerException("Job's class cannot be null");
        OperableTrigger trig = (OperableTrigger)trigger;
        if(trigger.getJobKey() == null)
            //设置触发器的jobKey
            trig.setJobKey(jobDetail.getKey());
        else
        if(!trigger.getJobKey().equals(jobDetail.getKey()))
            throw new SchedulerException("Trigger does not reference given job!");
        trig.validate();
        Calendar cal = null;
        if(trigger.getCalendarName() != null)
            cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
        Date ft = trig.computeFirstFireTime(cal);
        if(ft == null)
        {
            throw new SchedulerException((new StringBuilder()).append("Based on configured schedule, the given trigger '").append(trigger.getKey()).append("' will never fire.").toString());
        } else
        {
	    //存储jobDetail, trig到RAMJobStore
            resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
	    //下面三个现在先不说以后再说
            notifySchedulerListenersJobAdded(jobDetail);
            notifySchedulerThread(trigger.getNextFireTime().getTime());
            notifySchedulerListenersSchduled(trigger);
            return ft;
        }
    public boolean unscheduleJob(TriggerKey triggerKey)
        throws SchedulerException
    {
        validateState();
	//从容器中的RAMJobStore的triggersByKey Map中将triggerKey移除
        if(resources.getJobStore().removeTrigger(triggerKey))
        {
            notifySchedulerThread(0L);
            notifySchedulerListenersUnscheduled(triggerKey);
        } else
        {
            return false;
        }
        return true;
    }

    public boolean deleteJob(JobKey jobKey)
        throws SchedulerException
    {
        validateState();
        boolean result = false;
	//获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper>
        List triggers = getTriggersOfJob(jobKey);
        for(Iterator i$ = triggers.iterator(); i$.hasNext();)
        {
            Trigger trigger = (Trigger)i$.next();
	    //移除触发器
            if(!unscheduleJob(trigger.getKey()))
            {
                StringBuilder sb = (new StringBuilder()).append("Unable to unschedule trigger [").append(trigger.getKey()).append("] while deleting job [").append(jobKey).append("]");
                throw new SchedulerException(sb.toString());
            }
            result = true;
        }
        //从容器中的RAMJobStore的triggersByKey,trriger,
        //triggersByGroup,timeTriggers,中将jobKey对应的TrrigerWapper移除
        //同时,从jobsByKey,jobsByGroup的移除对应jobKey的JobWrapper相关信息
        result = resources.getJobStore().removeJob(jobKey) || result;
        if(result)
        {
            notifySchedulerThread(0L);
            notifySchedulerListenersJobDeleted(jobKey);
        }
        return result;
    }

    private static String VERSION_MAJOR;
    private static String VERSION_MINOR;
    private static String VERSION_ITERATION;
    private QuartzSchedulerResources resources;
    private QuartzSchedulerThread schedThread;
    private ThreadGroup threadGroup;
    private SchedulerContext context;
    private ListenerManager listenerManager;
    private HashMap internalJobListeners;
    private HashMap internalTriggerListeners;
    private ArrayList internalSchedulerListeners;
    private JobFactory jobFactory;
    ExecutingJobsManager jobMgr;
    ErrorLogger errLogger;
    private SchedulerSignaler signaler;
    private Random random;
    private ArrayList holdToPreventGC;
    private boolean signalOnSchedulingChange;
    private volatile boolean closed;
    private volatile boolean shuttingDown;
    private boolean boundRemotely;
    private QuartzSchedulerMBean jmxBean;
    private Date initialStart;
    private final Timer updateTimer;
    private final Logger log = LoggerFactory.getLogger(getClass());

    static 
    {
        Properties props;
        InputStream is;
        VERSION_MAJOR = "UNKNOWN";
        VERSION_MINOR = "UNKNOWN";
        VERSION_ITERATION = "UNKNOWN";
        props = new Properties();
        is = null;
        is = org/quartz/core/QuartzScheduler.getResourceAsStream("quartz-build.properties");
        if(is != null)
        {
            props.load(is);
            String version = props.getProperty("version");
            if(version != null)
            {
                String versionComponents[] = version.split("\\.");
                VERSION_MAJOR = versionComponents[0];
                VERSION_MINOR = versionComponents[1];
                if(versionComponents.length > 2)
                    VERSION_ITERATION = versionComponents[2];
                else
                    VERSION_ITERATION = "0";
            } else
            {
                LoggerFactory.getLogger(org/quartz/core/QuartzScheduler).error("Can't parse Quartz version from quartz-build.properties");
            }
        }
        if(is != null)
            try
            {
                is.close();
            }
            catch(Exception ignore) { }
        break MISSING_BLOCK_LABEL_181;
        Exception e;
        e;
        LoggerFactory.getLogger(org/quartz/core/QuartzScheduler).error("Error loading version info from quartz-build.properties.", e);
        if(is != null)
            try
            {
                is.close();
            }
            catch(Exception ignore) { }
        break MISSING_BLOCK_LABEL_181;
        Exception exception;
        exception;
        if(is != null)
            try
            {
                is.close();
            }
            catch(Exception ignore) { }
        throw exception;
    }
}

//Job存储的实现
public class RAMJobStore
    implements JobStore
{
//获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper>
 public List getTriggersForJob(JobKey jobKey)
    {
        ArrayList trigList = new ArrayList();
        synchronized(lock)
        {
	    //从triggers中获取jobKey对应的TriggerWrapper
            Iterator i$ = triggers.iterator();
            do
            {
                if(!i$.hasNext())
                    break;
                TriggerWrapper tw = (TriggerWrapper)i$.next();
                if(tw.jobKey.equals(jobKey))
                    trigList.add((OperableTrigger)tw.trigger.clone());
            } while(true);
        }
        return trigList;
    }
    public void storeJobAndTrigger(JobDetail newJob, OperableTrigger newTrigger)
        throws JobPersistenceException
    {
        storeJob(newJob, false);
        storeTrigger(newTrigger, false);
    }
    //存储newJob到jobsByKey,jobsByGroup
    public void storeJob(JobDetail newJob, boolean replaceExisting)
        throws ObjectAlreadyExistsException
    {
        JobWrapper jw = new JobWrapper((JobDetail)newJob.clone());
        boolean repl = false;
        synchronized(lock)
        {
            if(jobsByKey.get(jw.key) != null)
            {
                if(!replaceExisting)
                    throw new ObjectAlreadyExistsException(newJob);
                repl = true;
            }
            if(!repl)
            {
                HashMap grpMap = (HashMap)jobsByGroup.get(newJob.getKey().getGroup());
                if(grpMap == null)
                {
                    grpMap = new HashMap(100);
                    jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
                }
		 //存储job到jobsByGroup
                grpMap.put(newJob.getKey(), jw);
		//存储job到jobsByKey
                jobsByKey.put(jw.key, jw);
            } else
            {
                JobWrapper orig = (JobWrapper)jobsByKey.get(jw.key);
                orig.jobDetail = jw.jobDetail;
            }
        }
    }
    //将newTrigger存储到triggersByKey,trriger,triggersByGroup,timeTriggers
     public void storeTrigger(OperableTrigger newTrigger, boolean replaceExisting)
        throws JobPersistenceException
    {
        TriggerWrapper tw = new TriggerWrapper((OperableTrigger)newTrigger.clone());
        synchronized(lock)
        {
            if(triggersByKey.get(tw.key) != null)
            {
                if(!replaceExisting)
                    throw new ObjectAlreadyExistsException(newTrigger);
                removeTrigger(newTrigger.getKey(), false);
            }
            if(retrieveJob(newTrigger.getJobKey()) == null)
                throw new JobPersistenceException((new StringBuilder()).append("The job (").append(newTrigger.getJobKey()).append(") referenced by the trigger does not exist.").toString());
            //存储trriger到triggers
	    triggers.add(tw);
            HashMap grpMap = (HashMap)triggersByGroup.get(newTrigger.getKey().getGroup());
            if(grpMap == null)
            {
                grpMap = new HashMap(100);
                triggersByGroup.put(newTrigger.getKey().getGroup(), grpMap);
            }
	     //存储trriger到triggersByGroup
            grpMap.put(newTrigger.getKey(), tw);
	    //存储trriger到triggersByKey
            triggersByKey.put(tw.key, tw);
            if(pausedTriggerGroups.contains(newTrigger.getKey().getGroup()) || pausedJobGroups.contains(newTrigger.getJobKey().getGroup()))
            {
                tw.state = 4;
                if(blockedJobs.contains(tw.jobKey))
                    tw.state = 6;
            } else
            if(blockedJobs.contains(tw.jobKey))
                tw.state = 5;
            else
	       // 将trriger添加到timeTriggers
                timeTriggers.add(tw);
        }
    }
     public boolean removeTrigger(TriggerKey triggerKey)
    {
        return removeTrigger(triggerKey, true);
    }
//从triggersByKey,triggers,timeTriggers移除key
    private boolean removeTrigger(TriggerKey key, boolean removeOrphanedJob)
    {
        boolean found;
        synchronized(lock)
        {
	   //从triggersByKey移除
            found = triggersByKey.remove(key) != null;
            if(found)
            {
                TriggerWrapper tw = null;
                HashMap grpMap = (HashMap)triggersByGroup.get(key.getGroup());
                if(grpMap != null)
                {
                    ////从triggersByGroup移除 
                    grpMap.remove(key);
                    if(grpMap.size() == 0)
                        triggersByGroup.remove(key.getGroup());
                }
                Iterator tgs = triggers.iterator();
                do
                {
                    if(!tgs.hasNext())
                        break;
                    tw = (TriggerWrapper)tgs.next();
                    if(!key.equals(tw.key))
                        continue;
		    //从triggers移除
                    tgs.remove();
                    break;
                } while(true);
		//从timeTriggers移除
                timeTriggers.remove(tw);
                if(removeOrphanedJob)
                {
                    JobWrapper jw = (JobWrapper)jobsByKey.get(tw.jobKey);
                    List trigs = getTriggersForJob(tw.jobKey);
                    if((trigs == null || trigs.size() == 0) && !jw.jobDetail.isDurable() && removeJob(jw.key))
                        signaler.notifySchedulerListenersJobDeleted(jw.key);
                }
            }
        }
        return found;
    }
//从容器中的RAMJobStore的triggersByKey,trriger,
//triggersByGroup,timeTriggers,中将jobKey对应的TrrigerWapper移除
//同时,从jobsByKey,jobsByGroup的移除对应jobKey的JobWrapper相关信息
    public boolean removeJob(JobKey jobKey)
    {
        boolean found = false;
        synchronized(lock)
        {
	    //获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper>
            List triggersOfJob = getTriggersForJob(jobKey);
            for(Iterator i$ = triggersOfJob.iterator(); i$.hasNext();)
            {
                OperableTrigger trig = (OperableTrigger)i$.next();
		//从triggersByKey,triggers,timeTriggers移除key
                removeTrigger(trig.getKey());
                found = true;
            }
            //从jobsByKey移除jobKey
            found = (jobsByKey.remove(jobKey) != null) | found;
            if(found)
            {
                HashMap grpMap = (HashMap)jobsByGroup.get(jobKey.getGroup());
                if(grpMap != null)
                {
		    //从jobsByGroup对应的group总移除jobKey
                    grpMap.remove(jobKey);
                    if(grpMap.size() == 0)
                        jobsByGroup.remove(jobKey.getGroup());
                }
            }
        }
        return found;
    }
    protected HashMap jobsByKey;//HashMap<JobKey,JobWrapper>
    protected HashMap triggersByKey;//HashMap<TrrigerKey,TriggerWrapper>
    protected HashMap jobsByGroup;//HashMap<String,HashMap<JobKey,JobWrapper>>,,key为JobKey.group
    protected HashMap triggersByGroup;//HashMap<String,HashMap<TrrigerKey,TriggerWrapper>>,,key为TrrigerKey.group
    protected TreeSet timeTriggers;//TreeSet<TrrigerWrapper> 红黑树
    protected HashMap calendarsByName;
    protected ArrayList triggers; //List<TriggerWrapper>
    protected final Object lock = new Object();
    protected HashSet pausedTriggerGroups;
    protected HashSet pausedJobGroups;
    protected HashSet blockedJobs;
    protected long misfireThreshold;
    protected SchedulerSignaler signaler;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private static final AtomicLong ftrCtr = new AtomicLong(System.currentTimeMillis());
}

TriggerKey与JobKey包装类
class TriggerWrapper
{

    TriggerWrapper(OperableTrigger trigger)
    {
        state = 0;
        if(trigger == null)
        {
            throw new IllegalArgumentException("Trigger cannot be null!");
        } else
        {
            this.trigger = trigger;
            key = trigger.getKey();
            jobKey = trigger.getJobKey();
            return;
        }
    }

    public boolean equals(Object obj)
    {
        if(obj instanceof TriggerWrapper)
        {
            TriggerWrapper tw = (TriggerWrapper)obj;
            if(tw.key.equals(key))
                return true;
        }
        return false;
    }

    public int hashCode()
    {
        return key.hashCode();
    }

    public OperableTrigger getTrigger()
    {
        return trigger;
    }

    public final TriggerKey key;
    public final JobKey jobKey;
    public final OperableTrigger trigger;
    public int state;
    public static final int STATE_WAITING = 0;
    public static final int STATE_ACQUIRED = 1;
    public static final int STATE_EXECUTING = 2;
    public static final int STATE_COMPLETE = 3;
    public static final int STATE_PAUSED = 4;
    public static final int STATE_BLOCKED = 5;
    public static final int STATE_PAUSED_BLOCKED = 6;
    public static final int STATE_ERROR = 7;
}

JobKey与JobDetail包装类
class JobWrapper
{

    JobWrapper(JobDetail jobDetail)
    {
        this.jobDetail = jobDetail;
        key = jobDetail.getKey();
    }

    public boolean equals(Object obj)
    {
        if(obj instanceof JobWrapper)
        {
            JobWrapper jw = (JobWrapper)obj;
            if(jw.key.equals(key))
                return true;
        }
        return false;
    }

    public int hashCode()
    {
        return key.hashCode();
    }

    public JobKey key;
    public JobDetail jobDetail;
}

//JobDetail
public class JobDetailImpl
    implements Cloneable, Serializable, JobDetail
{
 private static final long serialVersionUID = -6069784757781506897L;
    private String name;
    private String group;
    private String description;
    private Class jobClass;
    private JobDataMap jobDataMap;//拥有共享数据,就是一个Map
    private boolean durability;
    private boolean shouldRecover;
    private transient JobKey key;
}

//JobKey
public final class JobKey extends Key
{

    public JobKey(String name)
    {
        super(name, null);
    }

    public JobKey(String name, String group)
    {
        super(name, group);
    }

    public static JobKey jobKey(String name)
    {
        return new JobKey(name, null);
    }

    public static JobKey jobKey(String name, String group)
    {
        return new JobKey(name, group);
    }

    private static final long serialVersionUID = -6073883950062574010L;
}

//TriggerKey
public final class TriggerKey extends Key
{

    public TriggerKey(String name)
    {
        super(name, null);
    }

    public TriggerKey(String name, String group)
    {
        super(name, group);
    }

    public static TriggerKey triggerKey(String name)
    {
        return new TriggerKey(name, null);
    }

    public static TriggerKey triggerKey(String name, String group)
    {
        return new TriggerKey(name, group);
    }

    private static final long serialVersionUID = 8070357886703449660L;
}

//Key
package org.quartz.utils;

import java.io.Serializable;
import java.util.UUID;

public class Key
    implements Serializable, Comparable
{

    public Key(String name, String group)
    {
        if(name == null)
            throw new IllegalArgumentException("Name cannot be null.");
        this.name = name;
        if(group != null)
            this.group = group;
        else
            this.group = "DEFAULT";
    }

    public String getName()
    {
        return name;
    }

    public String getGroup()
    {
        return group;
    }
 
    private static final long serialVersionUID = -7141167957642391350L;
    public static final String DEFAULT_GROUP = "DEFAULT";
    private final String name;
    private final String group;
}


总结:
首先从Scheduler.scheduleJob(JobDetail jobDetail, Trigger trigger) 调度job,
实际上就是将job存储到RAM中的jobsByGroup,jobsByKey对应的Map中, 将trigger存储到
triggers(List),triggersByKey,triggersByGroup对应的Map中,及timeTriggers的Treeset中。
Scheduler.unscheduleJob(TriggerKey triggerKey) 就是将triggerKey从triggersByKey,triggersByGroup,
triggers,timeTriggers中移除;Scheduler.deleteJob(JobKey jobKey)
除了从容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper>,并unscheduleJob
(TriggerKey triggerKey)列表 List<TriggerWrapper>中的所有TriggerWrapper,同时从jobsByKey,jobsByGroup
的移除对应jobKey的相关信息
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics