`
Donald_Draper
  • 浏览: 950586 次
社区版块
存档分类
最新评论

Quartz任务调度源码分析

阅读更多
Quartz的使用:http://donald-draper.iteye.com/blog/2321886
Quartz的Scheduler初始化源码分析:http://donald-draper.iteye.com/blog/2322730
Quartz的job、触发器的暂停与恢复源码分析:http://donald-draper.iteye.com/blog/2322823
Quartz的Job存储,触发器、任务删除,源码分析:http://donald-draper.iteye.com/blog/2322725
Quartzs的job,trriger监听器源码分析:http://donald-draper.iteye.com/blog/2322863
Quartz 任务存储JobStoreTX 持久化之RDB:http://donald-draper.iteye.com/blog/2323297
Quartz 任务存储JobStoreTX 持久化之RDB-源码分析:http://donald-draper.iteye.com/blog/2323409
Quartz任务调度源码分析:http://donald-draper.iteye.com/blog/2323118
Spring与Quartz集成详解:http://donald-draper.iteye.com/blog/2323591
Spring与Quartz集成-源码分析:http://donald-draper.iteye.com/blog/2324132
LinkedList简介:http://www.cnblogs.com/lintong/p/4374292.html
LinkedList与ArrayList的有缺点:http://www.ttlsa.com/csharp/arraylist-and-linkedlist/
TreeMap原理:http://blog.csdn.net/chenssy/article/details/26668941
TreeMap与HashMap优缺点:http://blog.csdn.net/debugingstudy/article/details/12716327
TreeSet简介:http://www.cnblogs.com/ningvsban/archive/2013/05/06/3062535.html
TreeSet与TreeMap:http://blog.csdn.net/speedme/article/details/22661671
这里为什么要加TreeSet,TreeMap,与LinkList的简介,主要是Quarzt调度容器要用到,由于前面QuartzScheduler已经介绍过,我们这里直接看代码,可以先看后面的总结,再来看源码,有个总体的把握,思路更清晰一些。
public class QuartzScheduler
    implements RemotableQuartzScheduler
{
     //在StdSchedulerFactory初始化中,生成QuartzScheduler
     public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, long dbRetryInterval)
        throws SchedulerException
    {
        context = new SchedulerContext();
        listenerManager = new ListenerManagerImpl();
        internalJobListeners = new HashMap(10);
        internalTriggerListeners = new HashMap(10);
        internalSchedulerListeners = new ArrayList(10);
        jobFactory = new PropertySettingJobFactory();
        jobMgr = null;
        errLogger = null;
        random = new Random();
        holdToPreventGC = new ArrayList(5);
        signalOnSchedulingChange = true;
        closed = false;
        shuttingDown = false;
        boundRemotely = false;
        jmxBean = null;
        initialStart = null;
        this.resources = resources;
	//如果jobStore是JobListener,则添加到内部job监听器
        if(resources.getJobStore() instanceof JobListener)
            addInternalJobListener((JobListener)resources.getJobStore());
	//新建调度器线程
        schedThread = new QuartzSchedulerThread(this, resources);
	//获取线程执行器
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
	//执行调度器线程
        schedThreadExecutor.execute(schedThread);
        if(idleWaitTime > 0L)
	    //设置调度线程空闲时间
            schedThread.setIdleWaitTime(idleWaitTime);
        jobMgr = new ExecutingJobsManager();
	//添加job执行管理器到内部监听器
        addInternalJobListener(jobMgr);
        errLogger = new ErrorLogger();
        addInternalSchedulerListener(errLogger);
        signaler = new SchedulerSignalerImpl(this, schedThread);
	//获取执行前是否更新检查状态
        if(shouldRunUpdateCheck())
            updateTimer = scheduleUpdateCheck();
        else
            updateTimer = null;
        getLog().info((new StringBuilder()).append("Quartz Scheduler v.").append(getVersion()).append(" created.").toString());
    }
    //更新检查定时任务
    private Timer scheduleUpdateCheck()
    {
        Timer rval = new Timer(true);
        rval.scheduleAtFixedRate(new UpdateChecker(), 1000L, 604800000L);
        return rval;
    }
    //获取执行前是否更新检查状态
     private boolean shouldRunUpdateCheck()
    {
        return resources.isRunUpdateCheck() && !Boolean.getBoolean("org.quartz.scheduler.skipUpdateCheck") && !Boolean.getBoolean("org.terracotta.quartz.skipUpdateCheck");
    }
      //启动调度器
      public void start()
        throws SchedulerException
    {
        if(shuttingDown || closed)
            throw new SchedulerException("The Scheduler cannot be restarted after shutdown() has been called.");
          //通知监听器调度器启动
	notifySchedulerListenersStarting();
        if(initialStart == null)
        {
            initialStart = new Date();
            resources.getJobStore().schedulerStarted();
            startPlugins();
        } else
        {
            resources.getJobStore().schedulerResumed();
        }
	 //唤醒所有待执行的任务
        schedThread.togglePause(false);
        getLog().info((new StringBuilder()).append("Scheduler ").append(resources.getUniqueIdentifier()).append(" started.").toString());
        //通知监听器调度器启动
	notifySchedulerListenersStarted();
    }
    //通知监听器调度器就绪
     public void notifySchedulerListenersStarting()
    {
        List schedListeners = buildSchedulerListenerList();
        for(Iterator i$ = schedListeners.iterator(); i$.hasNext();)
        {
            SchedulerListener sl = (SchedulerListener)i$.next();
            try
            {
                sl.schedulerStarting();
            }
            catch(Exception e)
            {
                getLog().error("Error while notifying SchedulerListener of startup.", e);
            }
        }

    }
    //通知监听器调度器启动
    public void notifySchedulerListenersStarted()
    {
        List schedListeners = buildSchedulerListenerList();
        for(Iterator i$ = schedListeners.iterator(); i$.hasNext();)
        {
            SchedulerListener sl = (SchedulerListener)i$.next();
            try
            {
                sl.schedulerStarted();
            }
            catch(Exception e)
            {
                getLog().error("Error while notifying SchedulerListener of startup.", e);
            }
        }

    }
    //检查调度器状态,是否关闭
     public void validateState()
        throws SchedulerException
    {
        if(isShutdown())
            throw new SchedulerException("The Scheduler has been shutdown.");
        else
            return;
    }
    private static String VERSION_MAJOR;
    private static String VERSION_MINOR;
    private static String VERSION_ITERATION;
    private QuartzSchedulerResources resources;
    private QuartzSchedulerThread schedThread;
    private ThreadGroup threadGroup;
    private SchedulerContext context;
    private ListenerManager listenerManager;//监听管理器
    private HashMap internalJobListeners;
    private HashMap internalTriggerListeners;
    private ArrayList internalSchedulerListeners;
    private JobFactory jobFactory;
    ExecutingJobsManager jobMgr;
    ErrorLogger errLogger;
    private SchedulerSignaler signaler;
    private Random random;
    private ArrayList holdToPreventGC;
    private boolean signalOnSchedulingChange;
    private volatile boolean closed;
    private volatile boolean shuttingDown;
    private boolean boundRemotely;
    private QuartzSchedulerMBean jmxBean;
    private Date initialStart;
    private final Timer updateTimer;
}

//job存储器
public class RAMJobStore
    implements JobStore
{
    //存储器启动,这里我听好奇为什么为空体呢
    public void schedulerStarted()
    {
	//这里我听好奇为什么为空体呢
    }
     //存储器暂定
    public void schedulerPaused()
    {
    }
     //存储器恢复
    public void schedulerResumed()
    {
    }
    //获取下一刻要触发的任务List
     public List acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow)
    {
        Object obj = lock;
        JVM INSTR monitorenter ;
        List result;
        Set acquiredJobKeysForNoConcurrentExec;
        Set excludedTriggers;
        long firstAcquiredTriggerFireTime;
        result = new ArrayList();
        acquiredJobKeysForNoConcurrentExec = new HashSet();
        excludedTriggers = new HashSet();
        firstAcquiredTriggerFireTime = 0L;
        if(timeTriggers.size() == 0)
            return result;
_L2:    //TreeSet<TriggerWrapper>,获取timeTriggers的第一个触发任务tw
        TriggerWrapper tw = (TriggerWrapper)timeTriggers.first();
        JobKey jobKey;
        if(tw == null)
            break; /* Loop/switch isn't completed */
        try
        {
	    //从timeTriggers中移除触发任务tw
            timeTriggers.remove(tw);
        }
        catch(NoSuchElementException nsee)
        {
            break; /* Loop/switch isn't completed */
        }
        if(tw.trigger.getNextFireTime() == null)
            continue; /* Loop/switch isn't completed */
	//查看触发任务是否错过触发时间
        if(applyMisfire(tw))
        {
	    //获取tw触发任务下一次触发的时间,并将触发任务加入到timeTriggers中
            if(tw.trigger.getNextFireTime() != null)
                timeTriggers.add(tw);
            continue; /* Loop/switch isn't completed */
        }
        if(tw.getTrigger().getNextFireTime().getTime() > noLaterThan + timeWindow)
        {
	    //如果下一次触发时间大于now+idleTime+timeWindow将tw加入timeTriggers
            timeTriggers.add(tw);
            break; /* Loop/switch isn't completed */
        }
        jobKey = tw.trigger.getJobKey();
        JobDetail job = ((JobWrapper)jobsByKey.get(tw.trigger.getJobKey())).jobDetail;
	//job是否允许并发执行
        if(!job.isConcurrentExectionDisallowed())
            break MISSING_BLOCK_LABEL_259;
	//非并发执行Set是否包含jokKey
        if(!acquiredJobKeysForNoConcurrentExec.contains(jobKey))
            break; /* Loop/switch isn't completed */
	 //将tw添加到待调度任务Set
        excludedTriggers.add(tw);
        if(true) goto _L2; else goto _L1
_L1:
        acquiredJobKeysForNoConcurrentExec.add(jobKey);
        tw.state = 1;
        tw.trigger.setFireInstanceId(getFiredTriggerRecordId());
        OperableTrigger trig = (OperableTrigger)tw.trigger.clone();
        result.add(trig);
        if(firstAcquiredTriggerFireTime == 0L)
            firstAcquiredTriggerFireTime = tw.trigger.getNextFireTime().getTime();
        if(result.size() != maxCount) goto _L2; else goto _L3
_L3:
        if(excludedTriggers.size() > 0)
	    //将待调度任务Set-excludedTriggers添加到timeTriggers(调度任务容器TreeSet)
            timeTriggers.addAll(excludedTriggers);
        result;
        obj;
        JVM INSTR monitorexit ;
        return;
        Exception exception;
        exception;
        throw exception;
    }
    //获取就绪触发任务包装类集
     public List triggersFired(List firedTriggers)
    {
        Object obj = lock;
        JVM INSTR monitorenter ;
        List results;//List<TriggerFiredBundle>
        results = new ArrayList();
        Iterator i$ = firedTriggers.iterator();
        do
        {
            if(!i$.hasNext())
                break;
            OperableTrigger trigger = (OperableTrigger)i$.next();
	    //根据triggerKey从triggersByKey中获取TriggerWrapper-tw
            TriggerWrapper tw = (TriggerWrapper)triggersByKey.get(trigger.getKey());
            if(tw == null || tw.trigger == null || tw.state != 1)
                continue;
            Calendar cal = null;
            if(tw.trigger.getCalendarName() != null)
            {
                cal = retrieveCalendar(tw.trigger.getCalendarName());
                if(cal == null)
                    continue;
            }
            Date prevFireTime = trigger.getPreviousFireTime();
            timeTriggers.remove(tw);
            tw.trigger.triggered(cal);
            trigger.triggered(cal);
            tw.state = 0;
	    //包装触发任务
            TriggerFiredBundle bndle = new TriggerFiredBundle(retrieveJob(tw.jobKey), trigger, cal, false, new Date(), trigger.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
            JobDetail job = bndle.getJobDetail();
            if(job.isConcurrentExectionDisallowed())
            {
                ArrayList trigs = getTriggerWrappersForJob(job.getKey());
                TriggerWrapper ttw;
                for(Iterator i$ = trigs.iterator(); i$.hasNext(); timeTriggers.remove(ttw))
                {
                    ttw = (TriggerWrapper)i$.next();
                    if(ttw.state == 0)
                        ttw.state = 5;
                    if(ttw.state == 4)
                        ttw.state = 6;
                }
                
                blockedJobs.add(job.getKey());
            } else
            if(tw.trigger.getNextFireTime() != null)
                synchronized(lock)
                {
                    timeTriggers.add(tw);
                }
	    //List<TriggerFiredBundle>,触发任务包装类添加到结果集
            results.add(new TriggerFiredResult(bndle));
        } while(true);
        return results;
        Exception exception1;
        exception1;
        throw exception1;
    }

    //查看触发任务是否错过触发时间
     protected boolean applyMisfire(TriggerWrapper tw)
    {
        long misfireTime = System.currentTimeMillis();
        if(getMisfireThreshold() > 0L)
            misfireTime -= getMisfireThreshold();
        Date tnft = tw.trigger.getNextFireTime();
        if(tnft == null || tnft.getTime() > misfireTime || tw.trigger.getMisfireInstruction() == -1)
            return false;
        Calendar cal = null;
        if(tw.trigger.getCalendarName() != null)
            cal = retrieveCalendar(tw.trigger.getCalendarName());
        signaler.notifyTriggerListenersMisfired((OperableTrigger)tw.trigger.clone());
        tw.trigger.updateAfterMisfire(cal);
	//如果触发任务下一次触发时间为null,则代表触发任务完成
        if(tw.trigger.getNextFireTime() == null)
        {
	    //任务完成状态
            tw.state = 3;
	    //产生任务完成事件
            signaler.notifySchedulerListenersFinalized(tw.trigger);
            synchronized(lock)
            {
	       //从timeTriggers将tw移除
                timeTriggers.remove(tw);
            }
        } else
        if(tnft.equals(tw.trigger.getNextFireTime()))
            return false;
        return true;
    }
    //获取jobkey的触发任务
    public List getTriggersForJob(JobKey jobKey)
    {
        ArrayList trigList = new ArrayList();
        synchronized(lock)
        {
            Iterator i$ = triggers.iterator();
            do
            {
                if(!i$.hasNext())
                    break;
                TriggerWrapper tw = (TriggerWrapper)i$.next();
                if(tw.jobKey.equals(jobKey))
                    trigList.add((OperableTrigger)tw.trigger.clone());
            } while(true);
        }
        return trigList;
    }
     public void triggeredJobComplete(OperableTrigger trigger, JobDetail jobDetail, org.quartz.Trigger.CompletedExecutionInstruction triggerInstCode)
    {
        synchronized(lock)
        {
            JobWrapper jw = (JobWrapper)jobsByKey.get(jobDetail.getKey());
            TriggerWrapper tw = (TriggerWrapper)triggersByKey.get(trigger.getKey());
            if(jw != null)
            {
                JobDetail jd = jw.jobDetail;
                if(jd.isPersistJobDataAfterExecution())
                {
                    JobDataMap newData = jobDetail.getJobDataMap();
                    if(newData != null)
                    {
                        newData = (JobDataMap)newData.clone();
                        newData.clearDirtyFlag();
                    }
                    jd = jd.getJobBuilder().setJobData(newData).build();
                    jw.jobDetail = jd;
                }
                if(jd.isConcurrentExectionDisallowed())
                {
                    blockedJobs.remove(jd.getKey());
                    ArrayList trigs = getTriggerWrappersForJob(jd.getKey());
                    Iterator i$ = trigs.iterator();
                    do
                    {
                        if(!i$.hasNext())
                            break;
                        TriggerWrapper ttw = (TriggerWrapper)i$.next();
                        if(ttw.state == 5)
                        {
                            ttw.state = 0;
                            timeTriggers.add(ttw);
                        }
                        if(ttw.state == 6)
                            ttw.state = 4;
                    } while(true);
                    signaler.signalSchedulingChange(0L);
                }
            } else
            {  
	        //从job阻塞队列中移除jobKey
                blockedJobs.remove(jobDetail.getKey());
            }
            if(tw != null)
                if(triggerInstCode == org.quartz.Trigger.CompletedExecutionInstruction.DELETE_TRIGGER)
                {
                    if(trigger.getNextFireTime() == null)
                    {
                        if(tw.getTrigger().getNextFireTime() == null)
                            removeTrigger(trigger.getKey());
                    } else
                    {
                        removeTrigger(trigger.getKey());
                        signaler.signalSchedulingChange(0L);
                    }
                } else
                if(triggerInstCode == org.quartz.Trigger.CompletedExecutionInstruction.SET_TRIGGER_COMPLETE)
                {
		    //如果触发任务完成则从timeTriggers移除,并通知调度器
                    tw.state = 3;
                    timeTriggers.remove(tw);
                    signaler.signalSchedulingChange(0L);
                } else
                if(triggerInstCode == org.quartz.Trigger.CompletedExecutionInstruction.SET_TRIGGER_ERROR)
                {
                    getLog().info((new StringBuilder()).append("Trigger ").append(trigger.getKey()).append(" set to ERROR state.").toString());
                    tw.state = 7;
                    signaler.signalSchedulingChange(0L);
                } else
                if(triggerInstCode == org.quartz.Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR)
                {
                    getLog().info((new StringBuilder()).append("All triggers of Job ").append(trigger.getJobKey()).append(" set to ERROR state.").toString());
                    setAllTriggersOfJobToState(trigger.getJobKey(), 7);
                    signaler.signalSchedulingChange(0L);
                } else
                if(triggerInstCode == org.quartz.Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_COMPLETE)
                {
                    setAllTriggersOfJobToState(trigger.getJobKey(), 3);
                    signaler.signalSchedulingChange(0L);
                }
        }
    }


    protected HashMap jobsByKey;//HashMap<JobKey,JobWrapper>  
    protected HashMap triggersByKey;//HashMap<TrrigerKey,TriggerWrapper>  
    protected HashMap jobsByGroup;//HashMap<String,HashMap<JobKey,JobWrapper>>,,key为JobKey.group  
    protected HashMap triggersByGroup;//HashMap<String,HashMap<TrrigerKey,TriggerWrapper>>,,key为TrrigerKey.group  
    protected TreeSet timeTriggers;//TreeSet<TrrigerWrapper> 红黑树  
    protected HashMap calendarsByName;  
    protected ArrayList triggers; //List<TriggerWrapper>  
    protected final Object lock = new Object();  
    protected HashSet pausedTriggerGroups;  
    protected HashSet pausedJobGroups;  
    protected HashSet blockedJobs;  
    protected long misfireThreshold;  
    protected SchedulerSignaler signaler;  
    private final Logger log = LoggerFactory.getLogger(getClass());  
    private static final AtomicLong ftrCtr = new AtomicLong(System.currentTimeMillis());  
}  

//QuartzSchedulerMBeanImpl
public class QuartzSchedulerMBeanImpl extends StandardMBean
    implements NotificationEmitter, QuartzSchedulerMBean, JobListener, SchedulerListener
{
   //发送调度开始事件
    public void schedulerStarted()
    {
        sendNotification("schedulerStarted");
    }
    //调度就绪
    public void schedulerStarting()
    {
    }
    private static final MBeanNotificationInfo NOTIFICATION_INFO[];
    private final QuartzScheduler scheduler;
    private boolean sampledStatisticsEnabled;
    private SampledStatistics sampledStatistics;
    private static final SampledStatistics NULL_SAMPLED_STATISTICS = new NullSampledStatisticsImpl();
    protected final Emitter emitter = new Emitter();
    protected final AtomicLong sequenceNumber = new AtomicLong();

    static 
    {
        String notifTypes[] = {
            "schedulerStarted", "schedulerPaused", "schedulerShutdown"
        };
        String name = javax/management/Notification.getName();
        String description = "QuartzScheduler JMX Event";
        NOTIFICATION_INFO = (new MBeanNotificationInfo[] {
            new MBeanNotificationInfo(notifTypes, name, "QuartzScheduler JMX Event")
        });
    }
}

//QuartzSchedulerThread,调度器线程
public class QuartzSchedulerThread extends Thread
{

    QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs)
    {
        this(qs, qsRsrcs, qsRsrcs.getMakeSchedulerThreadDaemon(), 5);
    }
   //唤醒所有待执行的任务
    void togglePause(boolean pause)
    {
        synchronized(sigLock)
        {
            paused = pause;
            if(paused)
                signalSchedulingChange(0L);
            else
                sigLock.notifyAll();
        }
    }
    //定义下次唤醒事件,唤醒所有待执行的任务
    public void signalSchedulingChange(long candidateNewNextFireTime)
    {
        synchronized(sigLock)
        {
            signaled = true;
            signaledNextFireTime = candidateNewNextFireTime;
            sigLock.notifyAll();
        }
    }
    public void run()
    {
        boolean lastAcquireFailed = false;
_L15:
        if(halted.get())
            break; /* Loop/switch isn't completed */
label0:
        {
            synchronized(sigLock)
            {
                while(paused && !halted.get()) 
                    try
                    {
                        sigLock.wait(1000L);
                    }
                    catch(InterruptedException ignore) { }
                if(!halted.get())
                    break label0;
            }
            break; /* Loop/switch isn't completed */
        }
        obj;
        JVM INSTR monitorexit ;
          goto _L1
        exception1;
        throw exception1;
_L1:
        int availThreadCount;
        List triggers;
        long now;
	//获取线程池中可利用线程的数量
        availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
        if(availThreadCount <= 0)
            continue; /* Loop/switch isn't completed */
        triggers = null;
        now = System.currentTimeMillis();
	//清除调度信号状态
        clearSignaledSchedulingChange();
        try
        {
            triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
            lastAcquireFailed = false;
            if(log.isDebugEnabled())
                log.debug((new StringBuilder()).append("batch acquisition of ").append(triggers != null ? triggers.size() : 0).append(" triggers").toString());
        }
        catch(JobPersistenceException jpe)
        {
            if(!lastAcquireFailed)
                qs.notifySchedulerListenersError("An error occurred while scanning for the next triggers to fire.", jpe);
            lastAcquireFailed = true;
            continue; /* Loop/switch isn't completed */
        }
        catch(RuntimeException e)
        {
            if(!lastAcquireFailed)
                getLog().error((new StringBuilder()).append("quartzSchedulerThreadLoop: RuntimeException ").append(e.getMessage()).toString(), e);
            lastAcquireFailed = true;
            continue; /* Loop/switch isn't completed */
        }
        if(triggers == null || triggers.isEmpty()) goto _L3; else goto _L2
_L2:
        long triggerTime;
        long timeUntilTrigger;
        now = System.currentTimeMillis();
        triggerTime = ((OperableTrigger)triggers.get(0)).getNextFireTime().getTime();
	//计算下一刻触发时间
        timeUntilTrigger = triggerTime - now;
_L6:
label1:
        {
            if(timeUntilTrigger <= 2L)
                break; /* Loop/switch isn't completed */
            synchronized(sigLock)
            {
                if(!halted.get())
                    break label1;
            }
            break; /* Loop/switch isn't completed */
        }
        if(!isCandidateNewTimeEarlierWithinReason(triggerTime, false))
            try
            {
                now = System.currentTimeMillis();
                timeUntilTrigger = triggerTime - now;
                if(timeUntilTrigger >= 1L)
                    sigLock.wait(timeUntilTrigger);
            }
            catch(InterruptedException ignore) { }
        obj2;
        JVM INSTR monitorexit ;
          goto _L4
        exception2;
        throw exception2;
_L4:
        if(releaseIfScheduleChangedSignificantly(triggers, triggerTime))
            break; /* Loop/switch isn't completed */
        now = System.currentTimeMillis();
        timeUntilTrigger = triggerTime - now;
        if(true) goto _L6; else goto _L5
_L5:
        if(triggers.isEmpty())
            continue; /* Loop/switch isn't completed */
        List bndles;
        bndles = new ArrayList();
        boolean goAhead = true;
        synchronized(sigLock)
        {
            goAhead = !halted.get();
        }
        if(goAhead)
            try
            {
	       // //获取就绪触发任务包装类集,triggers = qsRsrcs.getJobStore().acquireNextTriggers
                List res = qsRsrcs.getJobStore().triggersFired(triggers);
                if(res != null)
                    bndles = res;
            }
            catch(SchedulerException se)
            {
                qs.notifySchedulerListenersError((new StringBuilder()).append("An error occurred while firing triggers '").append(triggers).append("'").toString(), se);
                int i = 0;
                while(i < triggers.size()) 
                {
                    qsRsrcs.getJobStore().releaseAcquiredTrigger((OperableTrigger)triggers.get(i));
                    i++;
                }
                continue; /* Loop/switch isn't completed */
            }
        int i = 0;
_L13:
        //触发任务包装类
        TriggerFiredBundle bndle;
        Exception exception;
        if(i >= bndles.size())
            continue; /* Loop/switch isn't completed */
        TriggerFiredResult result = (TriggerFiredResult)bndles.get(i);
        //获取触发任务包装类
        bndle = result.getTriggerFiredBundle();
        exception = result.getException();
        if(!(exception instanceof RuntimeException)) goto _L8; else goto _L7
_L7:
        getLog().error((new StringBuilder()).append("RuntimeException while firing trigger ").append(triggers.get(i)).toString(), exception);
        qsRsrcs.getJobStore().releaseAcquiredTrigger((OperableTrigger)triggers.get(i));
          goto _L9
_L8:
        if(bndle != null) goto _L11; else goto _L10
_L10:
        qsRsrcs.getJobStore().releaseAcquiredTrigger((OperableTrigger)triggers.get(i));
          goto _L9
_L11:
        //创建任务运行线程
        JobRunShell shell = null;
        shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
	//初始化执行环境
        shell.initialize(qs);
          goto _L12
        SchedulerException se;
        se;

        qsRsrcs.getJobStore().triggeredJobComplete((OperableTrigger)triggers.get(i), bndle.getJobDetail(), org.quartz.Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
          goto _L9
_L12:
        //运行触发任务
        if(!qsRsrcs.getThreadPool().runInThread(shell))
        {
            getLog().error("ThreadPool.runInThread() return false!");
	    //执行任务完成后的工作(把触发任务完成则从timeTriggers移除,并通知调度器)
            qsRsrcs.getJobStore().triggeredJobComplete((OperableTrigger)triggers.get(i), bndle.getJobDetail(), org.quartz.Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
        }
_L9:
        i++;
        if(true) goto _L13; else goto _L3
_L3:
        try
        {
            long now = System.currentTimeMillis();
            long waitTime = now + getRandomizedIdleWaitTime();
            long timeUntilContinue = waitTime - now;
            synchronized(sigLock)
            {
                try
                {
                    if(!halted.get() && !isScheduleChanged())
                        sigLock.wait(timeUntilContinue);
                }
                catch(InterruptedException ignore) { }
            }
        }
        catch(RuntimeException re)
        {
            getLog().error("Runtime error occurred in main trigger firing loop.", re);
        }
        if(true) goto _L15; else goto _L14
_L14:
        qs = null;
        qsRsrcs = null;
        return;
    }
    //清除调度状态
    public void clearSignaledSchedulingChange()
    {
        synchronized(sigLock)
        {
            signaled = false;
            signaledNextFireTime = 0L;
        }
    }
    private QuartzScheduler qs;
    private QuartzSchedulerResources qsRsrcs;
    private final Object sigLock;
    private boolean signaled;
    private long signaledNextFireTime;
    private boolean paused;
    private AtomicBoolean halted;
    private Random random;
    private static long DEFAULT_IDLE_WAIT_TIME = 30000L;
    private long idleWaitTime;
    private int idleWaitVariablness;
    private final Logger log;
}

//简单线程池
public class SimpleThreadPool
    implements ThreadPool
{
 //线程池初始化,在StdSchedulerFactory初始化中,调用threadExecutor.initialize();
 public void initialize()
        throws SchedulerConfigException
    {
        if(workers != null && workers.size() > 0)
            return;
        if(count <= 0)
            throw new SchedulerConfigException("Thread count must be > 0");
        if(prio <= 0 || prio > 9)
            throw new SchedulerConfigException("Thread priority must be > 0 and <= 9");
	//初始化线程组
        if(isThreadsInheritGroupOfInitializingThread())
        {
            threadGroup = Thread.currentThread().getThreadGroup();
        } else
        {
            threadGroup = Thread.currentThread().getThreadGroup();
            ThreadGroup parent;
            for(parent = threadGroup; !parent.getName().equals("main"); parent = threadGroup.getParent())
                threadGroup = parent;

            threadGroup = new ThreadGroup(parent, (new StringBuilder()).append(schedulerInstanceName).append("-SimpleThreadPool").toString());
            if(isMakeThreadsDaemons())
                threadGroup.setDaemon(true);
        }
	
        if(isThreadsInheritContextClassLoaderOfInitializingThread())
            getLog().info((new StringBuilder()).append("Job execution threads will use class loader of thread: ").append(Thread.currentThread().getName()).toString());
        WorkerThread wt;
	//创建工作线程,并启动
        for(Iterator workerThreads = createWorkerThreads(count).iterator(); workerThreads.hasNext(); availWorkers.add(wt))
        {
            wt = (WorkerThread)workerThreads.next();
            wt.start();
        }

    }
    //创建工作线程
        protected List createWorkerThreads(int createCount)
    {
        workers = new LinkedList();
        for(int i = 1; i <= createCount; i++)
        {
            String threadPrefix = getThreadNamePrefix();
            if(threadPrefix == null)
                threadPrefix = (new StringBuilder()).append(schedulerInstanceName).append("_Worker").toString();
            WorkerThread wt = new WorkerThread(this, threadGroup, (new StringBuilder()).append(threadPrefix).append("-").append(i).toString(), getThreadPriority(), isMakeThreadsDaemons());
            if(isThreadsInheritContextClassLoaderOfInitializingThread())
                wt.setContextClassLoader(Thread.currentThread().getContextClassLoader());
            workers.add(wt);
        }

        return workers;
    }
//获取线程池中可利用线程的数量
public int blockForAvailableThreads()
    {
        Object obj = nextRunnableLock;
        JVM INSTR monitorenter ;
        while((availWorkers.size() < 1 || handoffPending) && !isShutdown) 
            try
            {
                nextRunnableLock.wait(500L);
            }
            catch(InterruptedException ignore) { }
        return availWorkers.size();
        Exception exception;
        exception;
        throw exception;
    }
    //这些在StdSchedulerFactory初始化中,
    // tProps = cfg.getPropertyGroup("org.quartz.threadExecutor", true);
    //threadExecutor = (ThreadExecutor)loadHelper.loadClass(threadExecutorClass).newInstance();
    //setBeanProps(threadExecutor, tProps)
    private int count; //线程池大小
    private int prio;//线程优先级
    private boolean isShutdown;
    private boolean handoffPending;
    private boolean inheritLoader;
    private boolean inheritGroup;
    private boolean makeThreadsDaemons;
    private ThreadGroup threadGroup;
    private final Object nextRunnableLock;
    private List workers;//List<WorkerThread>
    private LinkedList availWorkers;//LinkedList<WorkerThread>
    private LinkedList busyWorkers;
    private String threadNamePrefix;
    private final Logger log;
    private String schedulerInstanceName;
}

//更新检查定时任务
public class UpdateChecker extends TimerTask
{
    public void run()
    {
        checkForUpdate();
    }

    public void checkForUpdate()
    {
        try
        {
            doCheck();
        }
        catch(Throwable t)
        {
            LOG.debug((new StringBuilder()).append("Quartz version update check failed: ").append(t.getMessage()).toString());
        }
    }

    private void doCheck()
        throws IOException
    {
        LOG.debug("Checking for available updated version of Quartz...");
        URL updateUrl = buildUpdateCheckUrl();
        Properties updateProps = getUpdateProperties(updateUrl);
        String currentVersion = getQuartzVersion();
        String propVal = updateProps.getProperty("general.notice");
        if(notBlank(propVal))
            LOG.info(propVal);
        propVal = updateProps.getProperty((new StringBuilder()).append(currentVersion).append(".notices").toString());
        if(notBlank(propVal))
            LOG.info(propVal);
        propVal = updateProps.getProperty((new StringBuilder()).append(currentVersion).append(".updates").toString());
        if(notBlank(propVal))
        {
            StringBuilder sb = new StringBuilder();
            String newVersions[] = propVal.split(",");
            for(int i = 0; i < newVersions.length; i++)
            {
                String newVersion = newVersions[i].trim();
                if(i > 0)
                    sb.append(", ");
                sb.append(newVersion);
                propVal = updateProps.getProperty((new StringBuilder()).append(newVersion).append(".release-notes").toString());
                if(notBlank(propVal))
                {
                    sb.append(" [");
                    sb.append(propVal);
                    sb.append("]");
                }
            }

            if(sb.length() > 0)
                LOG.info((new StringBuilder()).append("New Quartz update(s) found: ").append(sb.toString()).toString());
        }
    }

    public static void main(String args[])
    {
        (new UpdateChecker()).run();
    }

    private static final Logger LOG = LoggerFactory.getLogger(org/quartz/utils/UpdateChecker);
    private static final long MILLIS_PER_SECOND = 1000L;
    private static final String UNKNOWN = "UNKNOWN";
    private static final String UPDATE_CHECK_URL = "http://www.terracotta.org/kit/reflector?kitID=quartz&pageID=update.properties";
    private static final long START_TIME = System.currentTimeMillis();
    private static final String PRODUCT_NAME = "Quartz";

}

//触发任务创建工厂类
public class JTAJobRunShellFactory
    implements JobRunShellFactory
{
    public void initialize(Scheduler sched)
        throws SchedulerConfigException
    {
        scheduler = sched;
    }
    public JobRunShell createJobRunShell(TriggerFiredBundle bundle)
        throws SchedulerException
    {
        return new JTAJobRunShell(scheduler, bundle);
    }
    private Scheduler scheduler;
}

//触发任务运行类
public class JTAJobRunShell extends JobRunShell
{

    public JTAJobRunShell(Scheduler scheduler, TriggerFiredBundle bndle)
    {
        super(scheduler, bndle);
        transactionTimeout = null;
    }
}
public class JobRunShell extends SchedulerListenerSupport
    implements Runnable
{
public JobRunShell(Scheduler scheduler, TriggerFiredBundle bndle)
    {
        jec = null;
        qs = null;
        firedTriggerBundle = null;
        this.scheduler = null;
        shutdownRequested = false;
        this.scheduler = scheduler;
        firedTriggerBundle = bndle;
    }
     public void run()
    {
        //添加到内部监听器
        qs.addInternalSchedulerListener(this);
label0:
        {
	   //protected JobExecutionContextImpl jec,job执行上下文
            OperableTrigger trigger = (OperableTrigger)jec.getTrigger();
            JobDetail jobDetail = jec.getJobDetail();
            org.quartz.Trigger.CompletedExecutionInstruction instCode;
            do
            {
                JobExecutionException jobExEx = null;
                Job job = jec.getJobInstance();
                try
                {
                    begin();
                }
                catch(SchedulerException se)
                {
                    qs.notifySchedulerListenersError((new StringBuilder()).append("Error executing Job (").append(jec.getJobDetail().getKey()).append(": couldn't begin execution.").toString(), se);
                    break label0;
                }
                try
                {
                    if(!notifyListenersBeginning(jec))
                        break label0;
                }
                catch(VetoedException ve)
                {
                    try
                    {
                        org.quartz.Trigger.CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null);
                        qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode);
                        if(jec.getTrigger().getNextFireTime() == null)
                            qs.notifySchedulerListenersFinalized(jec.getTrigger());
                        complete(true);
                    }
                    catch(SchedulerException se)
                    {
                        qs.notifySchedulerListenersError((new StringBuilder()).append("Error during veto of Job (").append(jec.getJobDetail().getKey()).append(": couldn't finalize execution.").toString(), se);
                    }
                    break label0;
                }
                long startTime = System.currentTimeMillis();
                long endTime = startTime;
                try
                {
                    log.debug((new StringBuilder()).append("Calling execute on job ").append(jobDetail.getKey()).toString());
                    //执行Job,关键
		    job.execute(jec);
                    endTime = System.currentTimeMillis();
                }
                catch(JobExecutionException jee)
                {
                    endTime = System.currentTimeMillis();
                    jobExEx = jee;
                    getLog().info((new StringBuilder()).append("Job ").append(jobDetail.getKey()).append(" threw a JobExecutionException: ").toString(), jobExEx);
                }
                catch(Throwable e)
                {
                    endTime = System.currentTimeMillis();
                    getLog().error((new StringBuilder()).append("Job ").append(jobDetail.getKey()).append(" threw an unhandled Exception: ").toString(), e);
                    SchedulerException se = new SchedulerException("Job threw an unhandled exception.", e);
                    qs.notifySchedulerListenersError((new StringBuilder()).append("Job (").append(jec.getJobDetail().getKey()).append(" threw an exception.").toString(), se);
                    jobExEx = new JobExecutionException(se, false);
                }
		//设置jJobExecutionContext运行时间
                jec.setJobRunTime(endTime - startTime);
                if(!notifyJobListenersComplete(jec, jobExEx))
                    break label0;
                instCode = org.quartz.Trigger.CompletedExecutionInstruction.NOOP;
                try
                {
                    instCode = trigger.executionComplete(jec, jobExEx);
                }
                catch(Exception e)
                {
                    SchedulerException se = new SchedulerException("Trigger threw an unhandled exception.", e);
                    qs.notifySchedulerListenersError("Please report this error to the Quartz developers.", se);
                }
                if(!notifyTriggerListenersComplete(jec, instCode))
                    break label0;
                if(instCode == org.quartz.Trigger.CompletedExecutionInstruction.RE_EXECUTE_JOB)
                {
                    jec.incrementRefireCount();
                    try
                    {
                        complete(false);
                    }
                    catch(SchedulerException se)
                    {
                        qs.notifySchedulerListenersError((new StringBuilder()).append("Error executing Job (").append(jec.getJobDetail().getKey()).append(": couldn't finalize execution.").toString(), se);
                    }
                    continue;
                }
                try
                {
                    complete(true);
                    break;
                }
                catch(SchedulerException se)
                {
                    qs.notifySchedulerListenersError((new StringBuilder()).append("Error executing Job (").append(jec.getJobDetail().getKey()).append(": couldn't finalize execution.").toString(), se);
                }
            } while(true);
	    //通知job执行完成
            qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
        }
        qs.removeInternalSchedulerListener(this);
        break MISSING_BLOCK_LABEL_710;
        Exception exception;
        exception;
        qs.removeInternalSchedulerListener(this);
        throw exception;
    }
    protected JobExecutionContextImpl jec;//job执行上下文
    protected QuartzScheduler qs;
    protected TriggerFiredBundle firedTriggerBundle;
    protected Scheduler scheduler;
    protected volatile boolean shutdownRequested;
    private final Logger log = LoggerFactory.getLogger(getClass());
}

//TriggerKey,JobKey包装类
class TriggerWrapper
{

    TriggerWrapper(OperableTrigger trigger)
    {
        state = 0;
        if(trigger == null)
        {
            throw new IllegalArgumentException("Trigger cannot be null!");
        } else
        {
            this.trigger = trigger;
            key = trigger.getKey();
            jobKey = trigger.getJobKey();
            return;
        }
    }

    public boolean equals(Object obj)
    {
        if(obj instanceof TriggerWrapper)
        {
            TriggerWrapper tw = (TriggerWrapper)obj;
            if(tw.key.equals(key))
                return true;
        }
        return false;
    }

    public int hashCode()
    {
        return key.hashCode();
    }

    public OperableTrigger getTrigger()
    {
        return trigger;
    }

    public final TriggerKey key;
    public final JobKey jobKey;
    public final OperableTrigger trigger;
    public int state;
    public static final int STATE_WAITING = 0;//等待
    public static final int STATE_ACQUIRED = 1;//就绪
    public static final int STATE_EXECUTING = 2;//执行
    public static final int STATE_COMPLETE = 3;//完成
    public static final int STATE_PAUSED = 4;//暂停
    public static final int STATE_BLOCKED = 5;//阻塞
    public static final int STATE_PAUSED_BLOCKED = 6;//暂停阻塞
    public static final int STATE_ERROR = 7;//错误
}

//简单触发器
public class SimpleTriggerImpl extends AbstractTrigger
    implements SimpleTrigger, CoreTrigger
{
 //获取下一次触发时间
 public Date getNextFireTime()
    {
        return nextFireTime;
    }
 private Date startTime;
    private Date endTime;
    private Date nextFireTime;
    private Date previousFireTime;
    private int repeatCount;
    private long repeatInterval;
    private int timesTriggered;
    private boolean complete;

}

//触发任务包装类
public class TriggerFiredBundle
    implements Serializable
{
public TriggerFiredBundle(JobDetail job, OperableTrigger trigger, Calendar cal, boolean jobIsRecovering, Date fireTime, Date scheduledFireTime, Date prevFireTime, 
            Date nextFireTime)
    {
        this.job = job;
        this.trigger = trigger;
        this.cal = cal;
        this.jobIsRecovering = jobIsRecovering;
        this.fireTime = fireTime;
        this.scheduledFireTime = scheduledFireTime;
        this.prevFireTime = prevFireTime;
        this.nextFireTime = nextFireTime;
    }
    private JobDetail job;
    private OperableTrigger trigger;
    private Calendar cal;
    private boolean jobIsRecovering;
    private Date fireTime;
    private Date scheduledFireTime;
    private Date prevFireTime;
    private Date nextFireTime;
}

//触发任务包装结果类
public class TriggerFiredResult
{
    public TriggerFiredResult(TriggerFiredBundle triggerFiredBundle)
    {
        this.triggerFiredBundle = triggerFiredBundle;
    }
    private TriggerFiredBundle triggerFiredBundle;
    private Exception exception;
}


总结:从源码分析中可以看出,任务的整个调度过程为,初始化线程池,及调度器QuartzScheduler,然后由线程池去执行QuartzSchedulerThread,将触发器任务(job与触发器)添加到存储器(TreeSet,timeTrriger)中,然后启动调度器,QuartzSchedulerThread从timeTrriger去除待触发的任务,并包装成TriggerFiredBundle,然后由JobRunShellFactory
创建TriggerFiredBundle的执行线程JobRunShell, 调度执行通过线程池SimpleThreadPool去执行JobRunShell,而JobRunShell执行的就是job.execute(JobExecutionContext context)。Quartz主要中的集合类有ArrayList,LinkedList,HashMap,TreeSet(TreeMap);之所以用到上面四个集合类,主要用到集合的如下特点:ArrayList访问速度快,LinkedList添加删除元素快;HashMap添加删除快,TreeSet访问速度快。

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics