`
Donald_Draper
  • 浏览: 951084 次
社区版块
存档分类
最新评论

Quartzs的job,trriger监听器源码分析

阅读更多
Quartz的使用:http://donald-draper.iteye.com/blog/2321886
Quartz的Scheduler初始化源码分析:http://donald-draper.iteye.com/blog/2322730
Quartz的job、触发器的暂停与恢复源码分析:http://donald-draper.iteye.com/blog/2322823
Quartz的Job存储,触发器、任务删除,源码分析:http://donald-draper.iteye.com/blog/2322725
Quartzs的job,trriger监听器源码分析:http://donald-draper.iteye.com/blog/2322863
Quartz 任务存储JobStoreTX 持久化之RDB:http://donald-draper.iteye.com/blog/2323297
Quartz 任务存储JobStoreTX 持久化之RDB-源码分析:http://donald-draper.iteye.com/blog/2323409
Quartz任务调度源码分析:http://donald-draper.iteye.com/blog/2323118
Spring与Quartz集成详解:http://donald-draper.iteye.com/blog/2323591
Spring与Quartz集成-源码分析:http://donald-draper.iteye.com/blog/2324132
这一节,我们来探索一下scheduleJob(JobDetail jobDetail, Trigger trigger),存储job之后所做的事情,以及怎么做?
public class QuartzScheduler
    implements RemotableQuartzScheduler
{
 public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
        throws SchedulerException
    {
        validateState();
        if(jobDetail == null)
            throw new SchedulerException("JobDetail cannot be null");
        if(trigger == null)
            throw new SchedulerException("Trigger cannot be null");
        if(jobDetail.getKey() == null)
            throw new SchedulerException("Job's key cannot be null");
        if(jobDetail.getJobClass() == null)
            throw new SchedulerException("Job's class cannot be null");
	//包装触发器
        OperableTrigger trig = (OperableTrigger)trigger;
        if(trigger.getJobKey() == null)
	    //设置触发器jobKey
            trig.setJobKey(jobDetail.getKey());
        else
        if(!trigger.getJobKey().equals(jobDetail.getKey()))
            throw new SchedulerException("Trigger does not reference given job!");
        trig.validate();
        Calendar cal = null;
        if(trigger.getCalendarName() != null)
            cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
        Date ft = trig.computeFirstFireTime(cal);
        if(ft == null)
        {
            throw new SchedulerException((new StringBuilder()).append("Based on configured schedule, the given trigger '").append(trigger.getKey()).append("' will never fire.").toString());
        } else
        {
	    //存储job and trriger到jobStrore
            resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
	    //添加jobDetail到调度监听器
            notifySchedulerListenersJobAdded(jobDetail);
            //通知调度器下一刻调度时间
            notifySchedulerThread(trigger.getNextFireTime().getTime());
             //添加trigger到调度监听器
            notifySchedulerListenersSchduled(trigger);
            return ft;
        }
     //添加jobDetail
     public void notifySchedulerListenersJobAdded(JobDetail jobDetail)
    {
        List schedListeners = buildSchedulerListenerList();
        for(Iterator i$ = schedListeners.iterator(); i$.hasNext();)
        {
            SchedulerListener sl = (SchedulerListener)i$.next();
            try
            {
                sl.jobAdded(jobDetail);
            }
            catch(Exception e)
            {
                getLog().error("Error while notifying SchedulerListener of JobAdded.", e);
            }
        }

    }
    //获取调度监听器
     private List buildSchedulerListenerList()
    {
        List allListeners = new LinkedList();
        allListeners.addAll(getListenerManager().getSchedulerListeners());
        allListeners.addAll(getInternalSchedulerListeners());
        return allListeners;
    }
    //获取内部调度器
      public List getInternalSchedulerListeners()
    {
        ArrayList arraylist = internalSchedulerListeners;
        JVM INSTR monitorenter ;
        return Collections.unmodifiableList(new ArrayList(internalSchedulerListeners));
        Exception exception;
        exception;
        throw exception;
    }
     //通知调度器下一刻调度时间
    protected void notifySchedulerThread(long candidateNewNextFireTime)
    {
        if(isSignalOnSchedulingChange())
            signaler.signalSchedulingChange(candidateNewNextFireTime);
    }
    //添加trigger到调度监听器
    public void notifySchedulerListenersSchduled(Trigger trigger)
    {
        List schedListeners = buildSchedulerListenerList();
        for(Iterator i$ = schedListeners.iterator(); i$.hasNext();)
        {
            SchedulerListener sl = (SchedulerListener)i$.next();
            try
            {
                sl.jobScheduled(trigger);
            }
            catch(Exception e)
            {
                getLog().error((new StringBuilder()).append("Error while notifying SchedulerListener of scheduled job.  Triger=").append(trigger.getKey()).toString(), e);
            }
        }

    }
    private static String VERSION_MAJOR;
    private static String VERSION_MINOR;
    private static String VERSION_ITERATION;
    private QuartzSchedulerResources resources;
    private QuartzSchedulerThread schedThread;
    private ThreadGroup threadGroup;
    private SchedulerContext context;
    private ListenerManager listenerManager;//监听管理器
    private HashMap internalJobListeners;
    private HashMap internalTriggerListeners;
    private ArrayList internalSchedulerListeners;
    private JobFactory jobFactory;
    ExecutingJobsManager jobMgr;
    ErrorLogger errLogger;
    private SchedulerSignaler signaler;
    private Random random;
    private ArrayList holdToPreventGC;
    private boolean signalOnSchedulingChange;
    private volatile boolean closed;
    private volatile boolean shuttingDown;
    private boolean boundRemotely;
    private QuartzSchedulerMBean jmxBean;
    private Date initialStart;
    private final Timer updateTimer;
}

//调度监听器
 public class ListenerManagerImpl
    implements ListenerManager
{
public ListenerManagerImpl()
    {
        globalJobListeners = new LinkedHashMap(10);
        globalTriggerListeners = new LinkedHashMap(10);
        globalJobListenersMatchers = new LinkedHashMap(10);
        globalTriggerListenersMatchers = new LinkedHashMap(10);
        schedulerListeners = new ArrayList(10);
    }
    //添加调度监听器
     public void addSchedulerListener(SchedulerListener schedulerListener)
    {
        synchronized(schedulerListeners)
        {
            schedulerListeners.add(schedulerListener);
        }
    }
    //添加job监听器
    public void addJobListener(JobListener jobListener, List matchers)
    {
        if(jobListener.getName() == null || jobListener.getName().length() == 0)
            throw new IllegalArgumentException("JobListener name cannot be empty.");
        synchronized(globalJobListeners)
        {
            globalJobListeners.put(jobListener.getName(), jobListener);
            LinkedList matchersL = new LinkedList();
            if(matchers != null && matchers.size() > 0)
                matchersL.addAll(matchers);
            else
                matchersL.add(EverythingMatcher.allJobs());
            globalJobListenersMatchers.put(jobListener.getName(), matchersL);
        }
    }
     //添加Trigger监听器
      public void addTriggerListener(TriggerListener triggerListener, List matchers)
    {
        if(triggerListener.getName() == null || triggerListener.getName().length() == 0)
            throw new IllegalArgumentException("TriggerListener name cannot be empty.");
        synchronized(globalTriggerListeners)
        {
            globalTriggerListeners.put(triggerListener.getName(), triggerListener);
            LinkedList matchersL = new LinkedList();
            if(matchers != null && matchers.size() > 0)
                matchersL.addAll(matchers);
            else
                matchersL.add(EverythingMatcher.allTriggers());
            globalTriggerListenersMatchers.put(triggerListener.getName(), matchersL);
        }
    }
     public List getSchedulerListeners()
    {
        ArrayList arraylist = schedulerListeners;
        JVM INSTR monitorenter ;
        return Collections.unmodifiableList(new ArrayList(schedulerListeners));
        Exception exception;
        exception;
        throw exception;
    }

    private Map globalJobListeners;//LinkedHashMap<String,JobListener>,key为JobListener.getName(),全局job监听器
    private Map globalTriggerListeners;//LinkedHashMap<String,TriggerListener>,key为TriggerListener.getName(),全局Trriger监听器
    private Map globalJobListenersMatchers;
    private Map globalTriggerListenersMatchers;
    private ArrayList schedulerListeners;//List<SchedulerListener>调度监听器
}

//QuartzSchedulerMBean
public class QuartzSchedulerMBeanImpl extends StandardMBean
    implements NotificationEmitter, QuartzSchedulerMBean, JobListener, SchedulerListener
{
  //添加jobDetail通知到JMX
  public void jobAdded(JobDetail jobDetail)
    {
        sendNotification("jobAdded", JobDetailSupport.toCompositeData(jobDetail));
    }
 //添加trigger通知到JMX
  public void jobScheduled(Trigger trigger)
    {
        sendNotification("jobScheduled", TriggerSupport.toCompositeData(trigger));
    }
  public void sendNotification(String eventType, Object data)
    {
        sendNotification(eventType, data, null);
    }

 public void sendNotification(String eventType, Object data, String msg)
    {
        Notification notif = new Notification(eventType, this, sequenceNumber.incrementAndGet(), System.currentTimeMillis(), msg);
        if(data != null)
            notif.setUserData(data);
        emitter.sendNotification(notif);
    }
}

//调度通知器
public class SchedulerSignalerImpl
    implements SchedulerSignaler
{

    public SchedulerSignalerImpl(QuartzScheduler sched, QuartzSchedulerThread schedThread)
    {
        log = LoggerFactory.getLogger(org/quartz/core/SchedulerSignalerImpl);
        this.sched = sched;
        this.schedThread = schedThread;
        log.info((new StringBuilder()).append("Initialized Scheduler Signaller of type: ").append(getClass()).toString());
    }

    public void signalSchedulingChange(long candidateNewNextFireTime)
    {
        schedThread.signalSchedulingChange(candidateNewNextFireTime);
    }

    Logger log;
    protected QuartzScheduler sched;
    protected QuartzSchedulerThread schedThread;
}

//调度器线程
public class QuartzSchedulerThread extends Thread
{
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs)
    {
        this(qs, qsRsrcs, qsRsrcs.getMakeSchedulerThreadDaemon(), 5);
    }
    //唤醒所有等待sigLock的调度器
     public void signalSchedulingChange(long candidateNewNextFireTime)
    {
        synchronized(sigLock)
        {
            signaled = true;
            signaledNextFireTime = candidateNewNextFireTime;
            sigLock.notifyAll();
        }
    }
 private QuartzScheduler qs;
    private QuartzSchedulerResources qsRsrcs;
    private final Object sigLock;
    private boolean signaled;
    private long signaledNextFireTime;
    private boolean paused;
    private AtomicBoolean halted;
    private Random random;
    private static long DEFAULT_IDLE_WAIT_TIME = 30000L;
    private long idleWaitTime;
    private int idleWaitVariablness;
    private final Logger log;

}

//SchedulerListener
public interface SchedulerListener
{
    public abstract void jobScheduled(Trigger trigger);
    public abstract void jobUnscheduled(TriggerKey triggerkey);
    public abstract void triggerFinalized(Trigger trigger);
    public abstract void triggerPaused(TriggerKey triggerkey);
    public abstract void triggersPaused(String s);
    public abstract void triggerResumed(TriggerKey triggerkey);
    public abstract void triggersResumed(String s);
    public abstract void jobAdded(JobDetail jobdetail);
    public abstract void jobDeleted(JobKey jobkey);
    public abstract void jobPaused(JobKey jobkey);
    public abstract void jobsPaused(String s);
    public abstract void jobResumed(JobKey jobkey);
    public abstract void jobsResumed(String s);
    public abstract void schedulerError(String s, SchedulerException schedulerexception);
    public abstract void schedulerInStandbyMode();
    public abstract void schedulerStarted();
    public abstract void schedulerStarting();
    public abstract void schedulerShutdown();
    public abstract void schedulerShuttingdown();
    public abstract void schedulingDataCleared();
}

//job监听器
public interface JobListener
{
    public abstract String getName();
    public abstract void jobToBeExecuted(JobExecutionContext jobexecutioncontext);
    public abstract void jobExecutionVetoed(JobExecutionContext jobexecutioncontext);
    public abstract void jobWasExecuted(JobExecutionContext jobexecutioncontext, JobExecutionException jobexecutionexception);
}

//Trigger监听器
public interface TriggerListener
{
    public abstract String getName();
    public abstract void triggerFired(Trigger trigger, JobExecutionContext jobexecutioncontext);
    public abstract boolean vetoJobExecution(Trigger trigger, JobExecutionContext jobexecutioncontext);
    public abstract void triggerMisfired(Trigger trigger);
    public abstract void triggerComplete(Trigger trigger, JobExecutionContext jobexecutioncontext, Trigger.CompletedExecutionInstruction completedexecutioninstruction);
}

总结:
从以上的分析可以看出,在job存储之后,之后做了job添加通知,通知调度器下一刻执行时间,并唤醒正在等在执行的job,然后添加trriger通知。
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics