`
Donald_Draper
  • 浏览: 948904 次
社区版块
存档分类
最新评论

TreeSet在Quartz任务调度过程中的作用

阅读更多
红黑树详解:http://www.cnblogs.com/skywang12345/p/3245399.html
Quartz任务调度源码分析:http://donald-draper.iteye.com/blog/2323118
引言:
上一篇Quartz任务调度源码分析文章虽然分析了任务调度,但分析的不够具体,由于上一篇文章是,去年写的当时能力有限,调度任务分析的不是很到位,今天来看一下TreeSet(TreeMap,红黑树实现,近似平衡二叉查找树)在任务调度过程中的使用,这也是添加要写这篇文章的原因
先来看看一下job与触发器,及触发器任务包装类先关的概念,由于以前的相关已说,这一简单列一下
class TriggerWrapper
{
    public final TriggerKey key;//关联触发器
    public final JobKey jobKey;//关联job
    public final OperableTrigger trigger;//关联触发
    public int state;
    public static final int STATE_WAITING = 0;//等待
    public static final int STATE_ACQUIRED = 1;//已调度,待执行
    public static final int STATE_EXECUTING = 2;//正在执行
    public static final int STATE_COMPLETE = 3;//执行完成
    public static final int STATE_PAUSED = 4;//暂停
    public static final int STATE_BLOCKED = 5;//阻塞
    public static final int STATE_PAUSED_BLOCKED = 6;//暂停阻塞
    public static final int STATE_ERROR = 7;//错误
}

public abstract class AbstractTrigger
    implements OperableTrigger
{
    private static final long serialVersionUID = -3904243490805975570L;
    private String name;
    private String group;
    private String jobName;
    private String jobGroup;
    private String description;
    private JobDataMap jobDataMap;
    private boolean volatility;
    private String calendarName;
    private String fireInstanceId;
    private int misfireInstruction;
    private int priority;
    private transient TriggerKey key;
}

public class CronTriggerImpl extends AbstractTrigger
    implements CronTrigger, CoreTrigger
{
    private static final long serialVersionUID = -8644953146451592766L;
    protected static final int YEAR_TO_GIVEUP_SCHEDULING_AT;
    private CronExpression cronEx;
    private Date startTime;
    private Date endTime;
    private Date nextFireTime;
    private Date previousFireTime;
    private transient TimeZone timeZone;
    static 
    {
        YEAR_TO_GIVEUP_SCHEDULING_AT = CronExpression.MAX_YEAR;
    }
}

//JobDetailImpl
public class JobDetailImpl
    implements Cloneable, Serializable, JobDetail
{
 private static final long serialVersionUID = -6069784757781506897L;
    private String name;
    private String group;
    private String description;
    private Class jobClass;
    private JobDataMap jobDataMap;
    private boolean durability;
    private boolean shouldRecover;
    private transient JobKey key;
}

来看调度任务
//QuartzScheduler
//根据触发器trigger调度job
public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
        throws SchedulerException
{
    ...
    //将job与触发器关联起来
    OperableTrigger trig = (OperableTrigger)trigger;
    if(trigger.getJobKey() == null)
         trig.setJobKey(jobDetail.getKey());
    ...
    Calendar cal = null;
    if(trigger.getCalendarName() != null)
    cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
    Date ft = trig.computeFirstFireTime(cal);
        if(ft == null)
        {
            throw new SchedulerException((new StringBuilder()).append("Based on configured schedule, the given trigger '").append(trigger.getKey()).append("' will never fire.").toString());
        } else
        {
	    //存储job和触发器到JobStore
            resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
            notifySchedulerListenersJobAdded(jobDetail);
	    //通知调度线程,调度任务
            notifySchedulerThread(trigger.getNextFireTime().getTime());
            notifySchedulerListenersSchduled(trigger);
            return ft;
        }
     ...
}

上面一个方法,我们有两点要看
1.
//存储job和触发器到JobStore
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);

2.
//通知调度线程,调度任务
notifySchedulerThread(trigger.getNextFireTime().getTime());

先看第一点我们以RAMJobStore为例来看一下job的存储,由于前面讲,我们看重点:
1.
//存储job和触发器到JobStore
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);

//RAMJobStore
public class RAMJobStore
    implements JobStore
{
    protected HashMap jobsByKey;
    protected HashMap triggersByKey;
    protected HashMap jobsByGroup;
    protected HashMap triggersByGroup;
    protected TreeSet timeTriggers;//触发任务,红黑树,这个是写这篇文章的原因
    protected HashMap calendarsByName;
    protected ArrayList triggers;
    protected final Object lock = new Object();
    protected HashSet pausedTriggerGroups;
    protected HashSet pausedJobGroups;
    protected HashSet blockedJobs;
    protected long misfireThreshold;
    protected SchedulerSignaler signaler;//任务调度通知器
    private final Logger log = LoggerFactory.getLogger(getClass());
    private static final AtomicLong ftrCtr = new AtomicLong(System.currentTimeMillis());
    public RAMJobStore()
    {
        jobsByKey = new HashMap(1000);
        triggersByKey = new HashMap(1000);
        jobsByGroup = new HashMap(25);
        triggersByGroup = new HashMap(25);
	//比较器TriggerWrapperComparator
        timeTriggers = new TreeSet(new TriggerWrapperComparator());
        calendarsByName = new HashMap(25);
        triggers = new ArrayList(1000);
        pausedTriggerGroups = new HashSet();
        pausedJobGroups = new HashSet();
        blockedJobs = new HashSet();
        misfireThreshold = 5000L;
    }
    //存储job和触发器
 public void storeJobAndTrigger(JobDetail newJob, OperableTrigger newTrigger)
        throws JobPersistenceException
    {
        //存储job
        storeJob(newJob, false);
	//存储触发器
        storeTrigger(newTrigger, false);
    }
   //存储job
    public void storeJob(JobDetail newJob, boolean replaceExisting)
        throws ObjectAlreadyExistsException
    {
        JobWrapper jw = new JobWrapper((JobDetail)newJob.clone());
        boolean repl = false;
        synchronized(lock)
        {
            if(jobsByKey.get(jw.key) != null)
            {
                if(!replaceExisting)
                    throw new ObjectAlreadyExistsException(newJob);
                repl = true;
            }
            if(!repl)
            {
                HashMap grpMap = (HashMap)jobsByGroup.get(newJob.getKey().getGroup());
                if(grpMap == null)
                {
                    grpMap = new HashMap(100);
                    jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
                }
                grpMap.put(newJob.getKey(), jw);
                jobsByKey.put(jw.key, jw);
            } else
            {
                JobWrapper orig = (JobWrapper)jobsByKey.get(jw.key);
                orig.jobDetail = jw.jobDetail;
            }
        }
    }
    //存储触发器
     public void storeTrigger(OperableTrigger newTrigger, boolean replaceExisting)
        throws JobPersistenceException
    {
        TriggerWrapper tw = new TriggerWrapper((OperableTrigger)newTrigger.clone());
        synchronized(lock)
        {
            if(triggersByKey.get(tw.key) != null)
            {
                if(!replaceExisting)
                    throw new ObjectAlreadyExistsException(newTrigger);
                removeTrigger(newTrigger.getKey(), false);
            }
            if(retrieveJob(newTrigger.getJobKey()) == null)
                throw new JobPersistenceException((new StringBuilder()).append("The job (").append(newTrigger.getJobKey()).append(") referenced by the trigger does not exist.").toString());
            triggers.add(tw);
            HashMap grpMap = (HashMap)triggersByGroup.get(newTrigger.getKey().getGroup());
            if(grpMap == null)
            {
                grpMap = new HashMap(100);
                triggersByGroup.put(newTrigger.getKey().getGroup(), grpMap);
            }
            grpMap.put(newTrigger.getKey(), tw);
            triggersByKey.put(tw.key, tw);
            if(pausedTriggerGroups.contains(newTrigger.getKey().getGroup()) || pausedJobGroups.contains(newTrigger.getJobKey().getGroup()))
            {
                tw.state = 4;
                if(blockedJobs.contains(tw.jobKey))
                    tw.state = 6;
            } else
            if(blockedJobs.contains(tw.jobKey))
                tw.state = 5;
            else
	        //这个使我们这篇文章的关注点,添加触发器到触发任务集timeTriggers(TreeSet)
                timeTriggers.add(tw);
        }
    }
}

从上面可以看,调度任务首先存储job和触发器到RAMJobStore中,同时添加触发器到触发任务集timeTriggers(TreeSet)中

再来看通知调度线程
2.
//通知调度线程,调度任务
notifySchedulerThread(trigger.getNextFireTime().getTime());


//QuartzScheduler
  private SchedulerSignaler signaler;
  protected void notifySchedulerThread(long candidateNewNextFireTime)
    {
        if(isSignalOnSchedulingChange())
            signaler.signalSchedulingChange(candidateNewNextFireTime);
    }

//SchedulerSignalerImpl
public class SchedulerSignalerImpl
    implements SchedulerSignaler
{
    Logger log;
    protected QuartzScheduler sched;
    protected QuartzSchedulerThread schedThread;
     public void signalSchedulingChange(long candidateNewNextFireTime)
    {
        schedThread.signalSchedulingChange(candidateNewNextFireTime);
    }
}

//QuartzSchedulerThread
public class QuartzSchedulerThread extends Thread
{
  private QuartzScheduler qs;
    private QuartzSchedulerResources qsRsrcs;
    private final Object sigLock;
    private boolean signaled;
    private long signaledNextFireTime;
    private boolean paused;
    private AtomicBoolean halted;
    private Random random;
    private static long DEFAULT_IDLE_WAIT_TIME = 30000L;
    private long idleWaitTime;
    private int idleWaitVariablness;
    private final Logger log;
  public void signalSchedulingChange(long candidateNewNextFireTime)
    {
        synchronized(sigLock)
        {
            signaled = true;
            signaledNextFireTime = candidateNewNextFireTime;
            sigLock.notifyAll();
        }
    }
     public void run()
    {
        boolean lastAcquireFailed = false;
_L15:
        if(halted.get())
            break; /* Loop/switch isn't completed */
label0:
        {
            synchronized(sigLock)
            {
                while(paused && !halted.get()) 
                    try
                    {
                        sigLock.wait(1000L);
                    }
                    catch(InterruptedException ignore) { }
                if(!halted.get())
                    break label0;
            }
            break; /* Loop/switch isn't completed */
        }
        obj;
        JVM INSTR monitorexit ;
          goto _L1
        exception1;
        throw exception1;
_L1:
        int availThreadCount;
        List triggers;//待触发任务集合
        long now;
        availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
        if(availThreadCount <= 0)
            continue; /* Loop/switch isn't completed */
        triggers = null;
        now = System.currentTimeMillis();
        clearSignaledSchedulingChange();
        try
        {
	   //获取待触发的任务
            triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
            lastAcquireFailed = false;
            if(log.isDebugEnabled())
                log.debug((new StringBuilder()).append("batch acquisition of ").append(triggers != null ? triggers.size() : 0).append(" triggers").toString());
        }
        catch(JobPersistenceException jpe)
        {
            if(!lastAcquireFailed)
                qs.notifySchedulerListenersError("An error occurred while scanning for the next triggers to fire.", jpe);
            lastAcquireFailed = true;
            continue; /* Loop/switch isn't completed */
        }
        catch(RuntimeException e)
        {
            if(!lastAcquireFailed)
                getLog().error((new StringBuilder()).append("quartzSchedulerThreadLoop: RuntimeException ").append(e.getMessage()).toString(), e);
            lastAcquireFailed = true;
            continue; /* Loop/switch isn't completed */
        }
        if(triggers == null || triggers.isEmpty()) goto _L3; else goto _L2
_L2:
        //获取第一个触发任务
        long triggerTime;
        long timeUntilTrigger;
        now = System.currentTimeMillis();
        triggerTime = ((OperableTrigger)triggers.get(0)).getNextFireTime().getTime();
        timeUntilTrigger = triggerTime - now;
_L6:
label1:
        {
            if(timeUntilTrigger <= 2L)
                break; /* Loop/switch isn't completed */
            synchronized(sigLock)
            {
                if(!halted.get())
                    break label1;
            }
            break; /* Loop/switch isn't completed */
        }
        if(!isCandidateNewTimeEarlierWithinReason(triggerTime, false))
            try
            {  
	        //计算触发还剩的触发时间
                now = System.currentTimeMillis();
                timeUntilTrigger = triggerTime - now;
                if(timeUntilTrigger >= 1L)
		    //如果剩余时间大于1,超时等待
                    sigLock.wait(timeUntilTrigger);
            }
            catch(InterruptedException ignore) { }
        obj2;
        JVM INSTR monitorexit ;
          goto _L4
        exception2;
        throw exception2;
_L4:
        if(releaseIfScheduleChangedSignificantly(triggers, triggerTime))
            break; /* Loop/switch isn't completed */
	//计算触发还剩的触发时间
        now = System.currentTimeMillis();
        timeUntilTrigger = triggerTime - now;
        if(true) goto _L6; else goto _L5
_L5:
        if(triggers.isEmpty())
            continue; /* Loop/switch isn't completed */
        List bndles;//触发任务包装集合,List<TriggerFiredBundle>
        bndles = new ArrayList();
        boolean goAhead = true;
        synchronized(sigLock)
        {
            goAhead = !halted.get();
        }
        if(goAhead)
            try
            {
	        //获取已激活的触发任务,待执行包装成TriggerFiredBundle
                List res = qsRsrcs.getJobStore().triggersFired(triggers);
                if(res != null)
                    bndles = res;
            }
            catch(SchedulerException se)
            {
                qs.notifySchedulerListenersError((new StringBuilder()).append("An error occurred while firing triggers '").append(triggers).append("'").toString(), se);
                int i = 0;
                while(i < triggers.size()) 
                {
                    qsRsrcs.getJobStore().releaseAcquiredTrigger((OperableTrigger)triggers.get(i));
                    i++;
                }
                continue; /* Loop/switch isn't completed */
            }
        int i = 0;
_L13:
        TriggerFiredBundle bndle;
        Exception exception;
        if(i >= bndles.size())
            continue; /* Loop/switch isn't completed */
        TriggerFiredResult result = (TriggerFiredResult)bndles.get(i);
        bndle = result.getTriggerFiredBundle();
        exception = result.getException();
        if(!(exception instanceof RuntimeException)) goto _L8; else goto _L7
_L7:
        getLog().error((new StringBuilder()).append("RuntimeException while firing trigger ").append(triggers.get(i)).toString(), exception);
        qsRsrcs.getJobStore().releaseAcquiredTrigger((OperableTrigger)triggers.get(i));
          goto _L9
_L8:
        if(bndle != null) goto _L11; else goto _L10
_L10:
        qsRsrcs.getJobStore().releaseAcquiredTrigger((OperableTrigger)triggers.get(i));
          goto _L9
_L11:
        //创建触发任务线程
        JobRunShell shell = null;
        shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
        shell.initialize(qs);
          goto _L12
        SchedulerException se;
        se;
        qsRsrcs.getJobStore().triggeredJobComplete((OperableTrigger)triggers.get(i), bndle.getJobDetail(), org.quartz.Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
          goto _L9
_L12:
        //执行触发任务
        if(!qsRsrcs.getThreadPool().runInThread(shell))
        {
            getLog().error("ThreadPool.runInThread() return false!");
            qsRsrcs.getJobStore().triggeredJobComplete((OperableTrigger)triggers.get(i), bndle.getJobDetail(), org.quartz.Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
        }
_L9:
        i++;
        if(true) goto _L13; else goto _L3
_L3:
        try
        {
            long now = System.currentTimeMillis();
            long waitTime = now + getRandomizedIdleWaitTime();
            long timeUntilContinue = waitTime - now;
            synchronized(sigLock)
            {
                try
                {
                    if(!halted.get() && !isScheduleChanged())
                        sigLock.wait(timeUntilContinue);
                }
                catch(InterruptedException ignore) { }
            }
        }
        catch(RuntimeException re)
        {
            getLog().error("Runtime error occurred in main trigger firing loop.", re);
        }
        if(true) goto _L15; else goto _L14
_L14:
        qs = null;
        qsRsrcs = null;
        return;
    }
}

以上方法有3点要看:
1.
//获取待触发的任务
triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

2.
List bndles;//触发任务包装集合,List<TriggerFiredBundle>
 bndles = new ArrayList();
 boolean goAhead = true;
 synchronized(sigLock)
 {
     goAhead = !halted.get();
 }
 if(goAhead)
     try
     {
      //获取已激活的触发任务,待执行包装成TriggerFiredBundle
         List res = qsRsrcs.getJobStore().triggersFired(triggers);
         if(res != null)
             bndles = res;
     }

3.
//遍历已经触发待执行的触发任务TriggerFiredBundle集,创建触发任务线程,并执行
_L13:
        TriggerFiredBundle bndle;
        Exception exception;
        if(i >= bndles.size())
            continue; /* Loop/switch isn't completed */
        TriggerFiredResult result = (TriggerFiredResult)bndles.get(i);
        bndle = result.getTriggerFiredBundle();
        exception = result.getException();
        if(!(exception instanceof RuntimeException)) goto _L8; else goto _L7
_L7:
        getLog().error((new StringBuilder()).append("RuntimeException while firing trigger ").append(triggers.get(i)).toString(), exception);
        qsRsrcs.getJobStore().releaseAcquiredTrigger((OperableTrigger)triggers.get(i));
          goto _L9
_L8:
        if(bndle != null) goto _L11; else goto _L10
_L10:
        qsRsrcs.getJobStore().releaseAcquiredTrigger((OperableTrigger)triggers.get(i));
          goto _L9
_L11:
        //创建触发任务线程
        JobRunShell shell = null;
        shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
        shell.initialize(qs);
          goto _L12
        SchedulerException se;
        se;
        qsRsrcs.getJobStore().triggeredJobComplete((OperableTrigger)triggers.get(i), bndle.getJobDetail(), org.quartz.Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
          goto _L9
_L12:
        //执行触发任务
        if(!qsRsrcs.getThreadPool().runInThread(shell))
        {
            getLog().error("ThreadPool.runInThread() return false!");
            qsRsrcs.getJobStore().triggeredJobComplete((OperableTrigger)triggers.get(i), bndle.getJobDetail(), org.quartz.Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
        }

下面我们分别来看以上3点:

1.
//获取待触发的任务
triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());


//RAMJobStore
//查找下一次触发时间小于noLaterThan + timeWindow时间的maxCount个触发任务
 public List acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow)
    {
        Object obj = lock;
        JVM INSTR monitorenter ;
        List result;
        Set acquiredJobKeysForNoConcurrentExec;
        Set excludedTriggers;
        long firstAcquiredTriggerFireTime;
        result = new ArrayList();
        acquiredJobKeysForNoConcurrentExec = new HashSet();
        excludedTriggers = new HashSet();
        firstAcquiredTriggerFireTime = 0L;
        if(timeTriggers.size() == 0)
            return result;
_L2:
        //获取触发器任务集中第一个元素
        TriggerWrapper tw = (TriggerWrapper)timeTriggers.first();
        JobKey jobKey;
        if(tw == null)
            break; /* Loop/switch isn't completed */
        try
        {
	    //从触发任务集,移除触发任务
            timeTriggers.remove(tw);
        }
        catch(NoSuchElementException nsee)
        {
            break; /* Loop/switch isn't completed */
        }
	//下一次触发时间为空,则跳出当前循环
        if(tw.trigger.getNextFireTime() == null)
            continue; /* Loop/switch isn't completed */
        if(applyMisfire(tw))
        {
	     //下一次触发时间不为空,则重新添加触发任务到触发任务集timeTriggers
            if(tw.trigger.getNextFireTime() != null)
                timeTriggers.add(tw);
            continue; /* Loop/switch isn't completed */
        }
        if(tw.getTrigger().getNextFireTime().getTime() > noLaterThan + timeWindow)
        {
            timeTriggers.add(tw);
            break; /* Loop/switch isn't completed */
        }
        jobKey = tw.trigger.getJobKey();
        JobDetail job = ((JobWrapper)jobsByKey.get(tw.trigger.getJobKey())).jobDetail;
        if(!job.isConcurrentExectionDisallowed())
            break MISSING_BLOCK_LABEL_259;
        if(!acquiredJobKeysForNoConcurrentExec.contains(jobKey))
            break; /* Loop/switch isn't completed */
        excludedTriggers.add(tw);
        if(true) goto _L2; else goto _L1
_L1:
        acquiredJobKeysForNoConcurrentExec.add(jobKey);
        tw.state = 1;//更新触发状态为已激活任务
        tw.trigger.setFireInstanceId(getFiredTriggerRecordId());
        OperableTrigger trig = (OperableTrigger)tw.trigger.clone();
	//添加触发任务到待触发执行结果集
        result.add(trig);
        if(firstAcquiredTriggerFireTime == 0L)
            firstAcquiredTriggerFireTime = tw.trigger.getNextFireTime().getTime();
        if(result.size() != maxCount) goto _L2; else goto _L3
_L3:
        if(excludedTriggers.size() > 0)
            timeTriggers.addAll(excludedTriggers);
        result;
        obj;
        JVM INSTR monitorexit ;
        return;
        Exception exception;
        exception;
        throw exception;
    }


2.
List bndles;//触发任务包装集合,List<TriggerFiredBundle>
 bndles = new ArrayList();
 boolean goAhead = true;
 synchronized(sigLock)
 {
     goAhead = !halted.get();
 }
 if(goAhead)
     try
     {
      //获取已激活的触发任务,待执行包装成TriggerFiredBundle
         List res = qsRsrcs.getJobStore().triggersFired(triggers);
         if(res != null)
             bndles = res;
     }



//RAMJobStore
//更新已经触发的任务状态
 public List triggersFired(List firedTriggers)
    {
        Object obj = lock;
        JVM INSTR monitorenter ;
        List results;
        results = new ArrayList();
        Iterator i$ = firedTriggers.iterator();
        do
        {
            if(!i$.hasNext())
                break;
            OperableTrigger trigger = (OperableTrigger)i$.next();
            TriggerWrapper tw = (TriggerWrapper)triggersByKey.get(trigger.getKey());
            if(tw == null || tw.trigger == null || tw.state != 1)
                continue;
            Calendar cal = null;
            if(tw.trigger.getCalendarName() != null)
            {
                cal = retrieveCalendar(tw.trigger.getCalendarName());
                if(cal == null)
                    continue;
            }
	    //获取先前执行时间
            Date prevFireTime = trigger.getPreviousFireTime();
	    //从触发任务集timeTriggers中移除触发任务
            timeTriggers.remove(tw);
	    //重新计算触发任务下一次触发时间
            tw.trigger.triggered(cal);
            trigger.triggered(cal);
            tw.state = 0;//更新触发任务状态为初始化等待状态
            TriggerFiredBundle bndle = new TriggerFiredBundle(retrieveJob(tw.jobKey), trigger, cal, false, new Date(), trigger.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
            JobDetail job = bndle.getJobDetail();
            if(job.isConcurrentExectionDisallowed())
            {
                ArrayList trigs = getTriggerWrappersForJob(job.getKey());
                TriggerWrapper ttw;
                for(Iterator i$ = trigs.iterator(); i$.hasNext(); timeTriggers.remove(ttw))
                {
                    ttw = (TriggerWrapper)i$.next();
                    if(ttw.state == 0)
                        ttw.state = 5;
                    if(ttw.state == 4)
                        ttw.state = 6;
                }

                blockedJobs.add(job.getKey());
            } else
            if(tw.trigger.getNextFireTime() != null)
                synchronized(lock)
                {
		    //如果触发任务下一次触发时间,并添加到触发任务集timeTriggers
                    timeTriggers.add(tw);
                }
            results.add(new TriggerFiredResult(bndle));
        } while(true);
        return results;
        Exception exception1;
        exception1;
        throw exception1;
    }


//TriggerFiredBundle
public class TriggerFiredBundle
    implements Serializable
{
    private static final long serialVersionUID = -6414106108306999265L;
    private JobDetail job;
    private OperableTrigger trigger;
    private Calendar cal;
    private boolean jobIsRecovering;
    private Date fireTime;
    private Date scheduledFireTime;
    private Date prevFireTime;
    private Date nextFireTime;
}

3.
//遍历已经触发待执行的触发任务TriggerFiredBundle集,创建触发任务线程,并执行
_L13:
        TriggerFiredBundle bndle;
        Exception exception;
        if(i >= bndles.size())
            continue; /* Loop/switch isn't completed */
        TriggerFiredResult result = (TriggerFiredResult)bndles.get(i);
        bndle = result.getTriggerFiredBundle();
        exception = result.getException();
        if(!(exception instanceof RuntimeException)) goto _L8; else goto _L7
_L7:
        getLog().error((new StringBuilder()).append("RuntimeException while firing trigger ").append(triggers.get(i)).toString(), exception);
        qsRsrcs.getJobStore().releaseAcquiredTrigger((OperableTrigger)triggers.get(i));
          goto _L9
_L8:
        if(bndle != null) goto _L11; else goto _L10
_L10:
        qsRsrcs.getJobStore().releaseAcquiredTrigger((OperableTrigger)triggers.get(i));
          goto _L9
_L11:
        //创建触发任务线程
        JobRunShell shell = null;
        shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
        shell.initialize(qs);
          goto _L12
        SchedulerException se;
        se;
        qsRsrcs.getJobStore().triggeredJobComplete((OperableTrigger)triggers.get(i), bndle.getJobDetail(), org.quartz.Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
          goto _L9
_L12:
        //执行触发任务
        if(!qsRsrcs.getThreadPool().runInThread(shell))
        {
            getLog().error("ThreadPool.runInThread() return false!");
            qsRsrcs.getJobStore().triggeredJobComplete((OperableTrigger)triggers.get(i), bndle.getJobDetail(), org.quartz.Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
        }


这一点主要是看创建触发任务线程JobRunShell
public class StdJobRunShellFactory
    implements JobRunShellFactory
{
    private Scheduler scheduler;
    public StdJobRunShellFactory()
    {
    }

    public void initialize(Scheduler sched)
    {
        scheduler = sched;
    }
    //创建触发任务执行线程
    public JobRunShell createJobRunShell(TriggerFiredBundle bndle)
        throws SchedulerException
    {
        return new JobRunShell(scheduler, bndle);
    }  
}

//JobRunShell
public class JobRunShell extends SchedulerListenerSupport
    implements Runnable
{ 
    protected JobExecutionContextImpl jec;
    protected QuartzScheduler qs;
    protected TriggerFiredBundle firedTriggerBundle;
    protected Scheduler scheduler;
    protected volatile boolean shutdownRequested;
    private final Logger log = LoggerFactory.getLogger(getClass());
     public void run()  
    {  
        //添加到内部监听器  
        qs.addInternalSchedulerListener(this);  
label0:  
        {  
       //protected JobExecutionContextImpl jec,job执行上下文  
            OperableTrigger trigger = (OperableTrigger)jec.getTrigger();  
            JobDetail jobDetail = jec.getJobDetail();  
            org.quartz.Trigger.CompletedExecutionInstruction instCode;  
            do  
            {  
                JobExecutionException jobExEx = null;  
		//从任务执行器上下文获取job
                Job job = jec.getJobInstance();  
                try  
                {  
                    begin();  
                }  
                catch(SchedulerException se)  
                {  
                    qs.notifySchedulerListenersError((new StringBuilder()).append("Error executing Job (").append(jec.getJobDetail().getKey()).append(": couldn't begin execution.").toString(), se);  
                    break label0;  
                }  
                try  
                {  
                    if(!notifyListenersBeginning(jec))  
                        break label0;  
                }  
                catch(VetoedException ve)  
                {  
                    try  
                    {  
                        org.quartz.Trigger.CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null);  
                        qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode);  
                        if(jec.getTrigger().getNextFireTime() == null)  
                            qs.notifySchedulerListenersFinalized(jec.getTrigger());  
                        complete(true);  
                    }  
                    catch(SchedulerException se)  
                    {  
                        qs.notifySchedulerListenersError((new StringBuilder()).append("Error during veto of Job (").append(jec.getJobDetail().getKey()).append(": couldn't finalize execution.").toString(), se);  
                    }  
                    break label0;  
                }  
                long startTime = System.currentTimeMillis();  
                long endTime = startTime;  
                try  
                {  
                    log.debug((new StringBuilder()).append("Calling execute on job ").append(jobDetail.getKey()).toString());  
                    //执行Job,关键  
                     job.execute(jec);  
                    endTime = System.currentTimeMillis();  
                }  
                catch(JobExecutionException jee)  
                {  
                    endTime = System.currentTimeMillis();  
                    jobExEx = jee;  
                    getLog().info((new StringBuilder()).append("Job ").append(jobDetail.getKey()).append(" threw a JobExecutionException: ").toString(), jobExEx);  
                }  
               ...
              //设置jJobExecutionContext运行时间  
              jec.setJobRunTime(endTime - startTime);  
              ...
             //通知job执行完成  
            qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);  
        }  
    }  
}




总结:
调度任务首先将存储job和触发器关联起来,然后添加到RAMJobStore的job集合,触发器集合,同时添加触发器任务到触发任务集timeTriggers(TreeSet)中,然后通知调度线程调度任务;调度线程首先从触发任务集timeTriggers中获取执行时间窗口内的触发任务,并更新状态为激活状态,计算触发任务触发的时间,如果事件没有到,则超时等待,然后从触发任务集timeTriggers获取刚才状态为激活状态的触发任务,并包装成已触发任务TriggerFiredBundle,并将触发任务从触发任务集timeTriggers中移除,计算触发任务的下一次触发时间,如果下一次触发时间不为空,则重新添加触发任务到触发任务集timeTriggers;
调度线程将触发任务TriggerFiredBundle包装成JobRunShell,委托给调度线程执行器执行。在上述过程中关键的是触发任务集timeTriggers,触发任务集实际上是一个TreeSet,TreeSet依赖于TreeMap实现,TreeMap底层机制是红黑树,近似平衡二叉查找树。

下面我们简单看一下TreeSet:
public class TreeSet<E> extends AbstractSet<E>
    implements NavigableSet<E>, Cloneable, java.io.Serializable
{
    /**
     * The backing map.
     */
    private transient NavigableMap<E,Object> m;

    // Dummy value to associate with an Object in the backing Map
    private static final Object PRESENT = new Object();

    /**
     * Constructs a set backed by the specified navigable map.
     */
    TreeSet(NavigableMap<E,Object> m) {
        this.m = m;
    }

    /**
     * Constructs a new, empty tree set, sorted according to the
     * natural ordering of its elements.  All elements inserted into
     * the set must implement the {@link Comparable} interface.
     * Furthermore, all such elements must be <i>mutually
     * comparable</i>: {@code e1.compareTo(e2)} must not throw a
     * {@code ClassCastException} for any elements {@code e1} and
     * {@code e2} in the set.  If the user attempts to add an element
     * to the set that violates this constraint (for example, the user
     * attempts to add a string element to a set whose elements are
     * integers), the {@code add} call will throw a
     * {@code ClassCastException}.
     默认为TreeMap
     */
    public TreeSet() {
	this(new TreeMap<E,Object>());
    }
 
 public boolean add(E e) {
       //委托给内存TreeMap
	return m.put(e, PRESENT)==null;
    }
    ...
}

//TreeMaps
public class TreeMap<K,V>
    extends AbstractMap<K,V>
    implements NavigableMap<K,V>, Cloneable, java.io.Serializable
{
    /**
     * The comparator used to maintain order in this tree map, or
     * null if it uses the natural ordering of its keys.
     比较器,没有则以元素的天然属性为准
     *
     * @serial
     */
    private final Comparator<? super K> comparator;

    private transient Entry<K,V> root = null;

    /**
     * The number of entries in the tree
     */
    private transient int size = 0;

    /**
     * The number of structural modifications to the tree.
     */
    private transient int modCount = 0;
     /**
     * Constructs a new, empty tree map, using the natural ordering of its
     * keys.  All keys inserted into the map must implement the {@link
     * Comparable} interface.  Furthermore, all such keys must be
     * <i>mutually comparable</i>: <tt>k1.compareTo(k2)</tt> must not throw
     * a <tt>ClassCastException</tt> for any keys <tt>k1</tt> and
     * <tt>k2</tt> in the map.  If the user attempts to put a key into the
     * map that violates this constraint (for example, the user attempts to
     * put a string key into a map whose keys are integers), the
     * <tt>put(Object key, Object value)</tt> call will throw a
     * <tt>ClassCastException</tt>.
     */
    public TreeMap() {
        comparator = null;
    }
     /**
     * Associates the specified value with the specified key in this map.
     * If the map previously contained a mapping for the key, the old
     * value is replaced.
     *
     * @param key key with which the specified value is to be associated
     * @param value value to be associated with the specified key
     *
     * @return the previous value associated with <tt>key</tt>, or
     *         <tt>null</tt> if there was no mapping for <tt>key</tt>.
     *         (A <tt>null</tt> return can also indicate that the map
     *         previously associated <tt>null</tt> with <tt>key</tt>.)
     * @throws ClassCastException if the specified key cannot be compared
     *         with the keys currently in the map
     * @throws NullPointerException if the specified key is null
     *         and this map uses natural ordering, or its comparator
     *         does not permit null keys
     */
    public V put(K key, V value) {
        //获取红黑树的根节点
        Entry<K,V> t = root;
        if (t == null) {
	    // TBD:
	    // 5045147: (coll) Adding null to an empty TreeSet should
	    // throw NullPointerException
	    //
	    // compare(key, key); // type check
            root = new Entry<K,V>(key, value, null);
            size = 1;
            modCount++;
            return null;
        }
        int cmp;
        Entry<K,V> parent;
        // split comparator and comparable paths
	//如果比较器不为空,则使用比较器
        Comparator<? super K> cpr = comparator;
        if (cpr != null) {
            do {
                parent = t;
		//使用比较器
                cmp = cpr.compare(key, t.key);
                if (cmp < 0)
		    //向左找大于的节点,直至为空
                    t = t.left;
                else if (cmp > 0)
		    //向右找小于的节点,直至为空
                    t = t.right;
                else
                    return t.setValue(value);
            } while (t != null);
        }
        else {
	    //当比较器为空,使用元素Comparable比较方法
            if (key == null)
                throw new NullPointerException();
            Comparable<? super K> k = (Comparable<? super K>) key;
            do {
                parent = t;
                cmp = k.compareTo(t.key);
                if (cmp < 0)
		     //向左找大于key的节点,直至为空
                    t = t.left;
                else if (cmp > 0)
		    //向右找小于key的节点,直至为空
                    t = t.right;
                else
		    //key相等,则替换值
                    return t.setValue(value);
            } while (t != null);
        }
        Entry<K,V> e = new Entry<K,V>(key, value, parent);
        if (cmp < 0)
	    //放在合适节点的左孩子节点位置
            parent.left = e;
        else
	    //放在合适节点的右孩子节点位置
            parent.right = e;
	//如果需要,则旋转树
        fixAfterInsertion(e);
        size++;
        modCount++;
        return null;
    }
}


再来看获取TreeSet的first和last元素,
//TreeSet的first和last元素,
/**
 * @throws NoSuchElementException {@inheritDoc}
 */
public E first() {
    return m.firstKey();
}

//TreeMap
/**
 * @throws NoSuchElementException {@inheritDoc}
 获取第一个key
 */
public K firstKey() {
    return key(getFirstEntry());
}
/**
 * Returns the first Entry in the TreeMap (according to the TreeMap's
 * key-sort function).  Returns null if the TreeMap is empty.
 获取第一个Entry,最左边元素
 */
final Entry<K,V> getFirstEntry() {
    Entry<K,V> p = root;
    if (p != null)
        while (p.left != null)
            p = p.left;
    return p;
}
/**
 * Returns the key corresponding to the specified Entry.
 * @throws NoSuchElementException if the Entry is null
 返回entry对应的key
 */
static <K> K key(Entry<K,?> e) {
    if (e==null)
        throw new NoSuchElementException();
    return e.key;
}


/**
 * Returns the last Entry in the TreeMap (according to the TreeMap's
 * key-sort function).  Returns null if the TreeMap is empty.
 获取最后一个Entry,最右边元素
 */
final Entry<K,V> getLastEntry() {
    Entry<K,V> p = root;
    if (p != null)
        while (p.right != null)
            p = p.right;
    return p;
}

从上面可以看出TreeSet实际依托以TreeMap。
我们来看触发器的比较器Comparable实现
public class CronTriggerImpl extends AbstractTrigger
    implements CronTrigger, CoreTrigger
{

public abstract class AbstractTrigger
    implements OperableTrigger
{
...
	 public int compareTo(Trigger other)
	{
	    //如果两个触发器的key都为空,默认相等
	    if(other.getKey() == null && getKey() == null)
		return 0;
	    //被比较触发器key为空,则大
	    if(other.getKey() == null)
		return -1;
	    //当前触发器key为空,则大
	    if(getKey() == null)
		return 1;
	    else
	        //否则比较key
		return getKey().compareTo(other.getKey());
	}
}

public final class TriggerKey extends Key
{

public class Key
    implements Serializable, Comparable
{
...
   public volatile int compareTo(Object x0)
    {
        return compareTo((Trigger)x0);
    }
   public int compareTo(Key o)
    {
        if(group.equals("DEFAULT") && !o.group.equals("DEFAULT"))
            return -1;
        if(!group.equals("DEFAULT") && o.group.equals("DEFAULT"))
            return 1;
	//先比较触发器组,如果组名相同,再比较触发器名
        int r = group.compareTo(o.getGroup());
        if(r != 0)
            return r;
        else
            return name.compareTo(o.getName());
    }
...
}

再来看,往下找比较器的具体实现
public interface OperableTrigger
    extends MutableTrigger
{

public interface MutableTrigger
    extends Trigger
{

Comparable与Comparator的区别 :http://blog.csdn.net/mageshuai/article/details/3849143
public interface Trigger
    extends Serializable, Cloneable, Comparable
{
    public static class TriggerTimeComparator
        implements Comparator, Serializable
    {
        private static final long serialVersionUID = -3904243490805975570L;

        public TriggerTimeComparator()
        {
        }
	//比较key触发
         public volatile int compare(Object x0, Object x1)
        {
            return compare((Trigger)x0, (Trigger)x1);
        }
        //比较触发器
        public int compare(Trigger t1, Trigger t2)
        {
            return compare(t1.getNextFireTime(), t1.getPriority(), t1.getKey(), t2.getNextFireTime(), t2.getPriority(), t2.getKey());
        }
	//根据触发器的下一触发时间,优先级,触发器key比较触发器
        public static int compare(Date nextFireTime1, int priority1, TriggerKey key1, Date nextFireTime2, int priority2, TriggerKey key2)
        {
            if(nextFireTime1 != null || nextFireTime2 != null)
            {
	        //如果比较对象的下一次触发时间都不为空,则比较触发时间
                if(nextFireTime1 == null)
                    return 1;
                if(nextFireTime2 == null)
                    return -1;
		 //触发器1小于于触发器2,触发器1对应的任务先与触发器2关联任务
                if(nextFireTime1.before(nextFireTime2))
                    return -1;
		//触发器1大于触发器2,触发器1对应的任务在触发器2关联任务后触发
                if(nextFireTime1.after(nextFireTime2))
                    return 1;
            }
	    //若果触发器比较对象,有一个下一次触发时间为空,则比较优先级
            int comp = priority2 - priority1;
            if(comp != 0)
                return comp;
            else
	        //优先级为空,则 比较触发器key
                return key1.compareTo(key2);
        }
    }

    public static final class CompletedExecutionInstruction extends Enum
    {

        public static CompletedExecutionInstruction[] values()
        {
            return (CompletedExecutionInstruction[])$VALUES.clone();
        }

        public static CompletedExecutionInstruction valueOf(String name)
        {
            return (CompletedExecutionInstruction)Enum.valueOf(org/quartz/Trigger$CompletedExecutionInstruction, name);
        }

        public static final CompletedExecutionInstruction NOOP;
        public static final CompletedExecutionInstruction RE_EXECUTE_JOB;
        public static final CompletedExecutionInstruction SET_TRIGGER_COMPLETE;
        public static final CompletedExecutionInstruction DELETE_TRIGGER;
        public static final CompletedExecutionInstruction SET_ALL_JOB_TRIGGERS_COMPLETE;
        public static final CompletedExecutionInstruction SET_TRIGGER_ERROR;
        public static final CompletedExecutionInstruction SET_ALL_JOB_TRIGGERS_ERROR;
        private static final CompletedExecutionInstruction $VALUES[];

        static 
        {
            NOOP = new CompletedExecutionInstruction("NOOP", 0);
            RE_EXECUTE_JOB = new CompletedExecutionInstruction("RE_EXECUTE_JOB", 1);
            SET_TRIGGER_COMPLETE = new CompletedExecutionInstruction("SET_TRIGGER_COMPLETE", 2);
            DELETE_TRIGGER = new CompletedExecutionInstruction("DELETE_TRIGGER", 3);
            SET_ALL_JOB_TRIGGERS_COMPLETE = new CompletedExecutionInstruction("SET_ALL_JOB_TRIGGERS_COMPLETE", 4);
            SET_TRIGGER_ERROR = new CompletedExecutionInstruction("SET_TRIGGER_ERROR", 5);
            SET_ALL_JOB_TRIGGERS_ERROR = new CompletedExecutionInstruction("SET_ALL_JOB_TRIGGERS_ERROR", 6);
            $VALUES = (new CompletedExecutionInstruction[] {
                NOOP, RE_EXECUTE_JOB, SET_TRIGGER_COMPLETE, DELETE_TRIGGER, SET_ALL_JOB_TRIGGERS_COMPLETE, SET_TRIGGER_ERROR, SET_ALL_JOB_TRIGGERS_ERROR
            });
        }

        private CompletedExecutionInstruction(String s, int i)
        {
            super(s, i);
        }
    }

    public static final class TriggerState extends Enum
    {

        public static TriggerState[] values()
        {
            return (TriggerState[])$VALUES.clone();
        }

        public static TriggerState valueOf(String name)
        {
            return (TriggerState)Enum.valueOf(org/quartz/Trigger$TriggerState, name);
        }

        public static final TriggerState NONE;
        public static final TriggerState NORMAL;
        public static final TriggerState PAUSED;
        public static final TriggerState COMPLETE;
        public static final TriggerState ERROR;
        public static final TriggerState BLOCKED;
        private static final TriggerState $VALUES[];

        static 
        {
            NONE = new TriggerState("NONE", 0);
            NORMAL = new TriggerState("NORMAL", 1);
            PAUSED = new TriggerState("PAUSED", 2);
            COMPLETE = new TriggerState("COMPLETE", 3);
            ERROR = new TriggerState("ERROR", 4);
            BLOCKED = new TriggerState("BLOCKED", 5);
            $VALUES = (new TriggerState[] {
                NONE, NORMAL, PAUSED, COMPLETE, ERROR, BLOCKED
            });
        }

        private TriggerState(String s, int i)
        {
            super(s, i);
        }
    }
    public abstract TriggerKey getKey();
    public abstract JobKey getJobKey();
    public abstract String getDescription();
    public abstract String getCalendarName();
    public abstract JobDataMap getJobDataMap();
    public abstract int getPriority();
    public abstract boolean mayFireAgain();
    public abstract Date getStartTime();
    public abstract Date getEndTime();
    public abstract Date getNextFireTime();
    public abstract Date getPreviousFireTime();
    public abstract Date getFireTimeAfter(Date date);
    public abstract Date getFinalFireTime();
    public abstract int getMisfireInstruction();
    public abstract TriggerBuilder getTriggerBuilder();
    public abstract ScheduleBuilder getScheduleBuilder();
    public abstract boolean equals(Object obj);
    //比较待子类实现
    public abstract int compareTo(Trigger trigger);
    public static final long serialVersionUID = -3904243490805975570L;
    public static final int MISFIRE_INSTRUCTION_SMART_POLICY = 0;
    public static final int MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1;
    public static final int DEFAULT_PRIORITY = 5;
}

从上面可以看出添加触发器到触发器任务集timeTriggers(TreeSet),从上面分析来是AbstractTrigger的比较方法,实际为比价触发器的key,感觉没有太大的意义,应该为Trigger的静态内部比较器类TriggerTimeComparator更合理,Quartz javadoc描述TriggerTimeComparator作用如下:
A Comparator that compares trigger's next fire times, or in other words,
sorts them according to earliest next fire time.
If the fire times are the same, then the triggers are sorted according to priority
(highest value first), if the priorities are the same, then they are sorted by key.
触发器添加到触发器任务集timeTriggers(TreeSet)应该以TriggerTimeComparator比较器比较才对,而不是AbstractTrigger的比较方法,这一点不是很理解,刹那间,灵光出现,
public class RAMJobStore
    implements JobStore
{
    protected HashMap jobsByKey;
    protected HashMap triggersByKey;
    protected HashMap jobsByGroup;
    protected HashMap triggersByGroup;
    protected TreeSet timeTriggers;//触发任务,红黑树,这个是写这篇文章的原因
    protected HashMap calendarsByName;
    protected ArrayList triggers;
    protected final Object lock = new Object();
    protected HashSet pausedTriggerGroups;
    protected HashSet pausedJobGroups;
    protected HashSet blockedJobs;
    protected long misfireThreshold;
    protected SchedulerSignaler signaler;//任务调度通知器
    public RAMJobStore()
    {
        jobsByKey = new HashMap(1000);
        triggersByKey = new HashMap(1000);
        jobsByGroup = new HashMap(25);
        triggersByGroup = new HashMap(25);
	//关键就在这一句,比较器为TriggerWrapperComparator,是不是?就问是不是??
        timeTriggers = new TreeSet(new TriggerWrapperComparator());
        calendarsByName = new HashMap(25);
        triggers = new ArrayList(1000);
        pausedTriggerGroups = new HashSet();
        pausedJobGroups = new HashSet();
        blockedJobs = new HashSet();
        misfireThreshold = 5000L;
    }
}

从上面来看构造的触发器任务集timeTriggers,TreeSet的比较器为TriggerWrapperComparator,TriggerWrapper比较器,首先比较触发器的下一次触发时间,相等则再比较触发器有限级,相等最后再比较触发器key。还是看源码,比较好,由于与反编译的看的不是很详细,很容易丢失细节。Quartz相关的文章是很久以前看的,没有看源码,只是反编译看看,最近又用到,具体任务调度上次分析的是不很到位,这次就再看看;还是看源码好,细节不容易丢失。
    TreeSet在Quartz中的作用,就是存储触发任务,任务以下一次触发时间为比较标准,存储在触发任务集TreeSet中,Quartz调度任务时,从任务集取出指定时间窗口的待触发任务,并标记为已激活状态,等激活的任务触发时间到达时,取出触发任务进行调度。上面两个从触发任务集取任务过程为,取TreeSet的first元素,即TreeSet中触发时间最早的任务。
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics