`
Donald_Draper
  • 浏览: 952039 次
社区版块
存档分类
最新评论

Quartz的job、触发器的暂停与恢复源码分析

阅读更多
Quartz的使用:http://donald-draper.iteye.com/blog/2321886
Quartz的Scheduler初始化源码分析:http://donald-draper.iteye.com/blog/2322730
Quartz的job、触发器的暂停与恢复源码分析:http://donald-draper.iteye.com/blog/2322823
Quartz的Job存储,触发器、任务删除,源码分析:http://donald-draper.iteye.com/blog/2322725
Quartzs的job,trriger监听器源码分析:http://donald-draper.iteye.com/blog/2322863
Quartz 任务存储JobStoreTX 持久化之RDB:http://donald-draper.iteye.com/blog/2323297
Quartz 任务存储JobStoreTX 持久化之RDB-源码分析:http://donald-draper.iteye.com/blog/2323409
Quartz任务调度源码分析:http://donald-draper.iteye.com/blog/2323118
Spring与Quartz集成详解:http://donald-draper.iteye.com/blog/2323591
Spring与Quartz集成-源码分析:http://donald-draper.iteye.com/blog/2324132]

由于前面QuartzScheduler已经介绍过,我们这里直接看代码
public class QuartzScheduler
    implements RemotableQuartzScheduler
{
//暂停触发器
 public void pauseTrigger(TriggerKey triggerKey)
        throws SchedulerException
    {
        //检查调度器状态,是否关闭
        validateState();
	//暂停triggerKey
        resources.getJobStore().pauseTrigger(triggerKey);
	//通知调度器下一刻调度时间 
        notifySchedulerThread(0L);
	//添加trigger暂定事件到调度监听器  
        notifySchedulerListenersPausedTrigger(triggerKey);
    }
//恢复触发器
     public void resumeTrigger(TriggerKey triggerKey)
        throws SchedulerException
    {
       //检查调度器状态,是否关闭
        validateState();
	//恢复触发器triggerKey
        resources.getJobStore().resumeTrigger(triggerKey);
	////通知调度器下一刻调度时间 
        notifySchedulerThread(0L);
	//添加trigger恢复事件到调度监听器  
        notifySchedulerListenersResumedTrigger(triggerKey);
    }
    //暂定job
    public void pauseJob(JobKey jobKey)
        throws SchedulerException
    {
        validateState();
	//暂定job,关键
        resources.getJobStore().pauseJob(jobKey);
        notifySchedulerThread(0L);
	添加job暂定事件到调度监听器 
        notifySchedulerListenersPausedJob(jobKey);
    }
    //恢复job
    public void resumeJob(JobKey jobKey)
        throws SchedulerException
    {
        validateState();
	//恢复job,关键
        resources.getJobStore().resumeJob(jobKey);
        notifySchedulerThread(0L);
	//添加job恢复事件到调度监听器  
        notifySchedulerListenersResumedJob(jobKey);
    }
    //检查调度器状态,是否关闭
     public void validateState()
        throws SchedulerException
    {
        if(isShutdown())
            throw new SchedulerException("The Scheduler has been shutdown.");
        else
            return;
    }
    private static String VERSION_MAJOR;
    private static String VERSION_MINOR;
    private static String VERSION_ITERATION;
    private QuartzSchedulerResources resources;
    private QuartzSchedulerThread schedThread;
    private ThreadGroup threadGroup;
    private SchedulerContext context;
    private ListenerManager listenerManager;//监听管理器
    private HashMap internalJobListeners;
    private HashMap internalTriggerListeners;
    private ArrayList internalSchedulerListeners;
    private JobFactory jobFactory;
    ExecutingJobsManager jobMgr;
    ErrorLogger errLogger;
    private SchedulerSignaler signaler;
    private Random random;
    private ArrayList holdToPreventGC;
    private boolean signalOnSchedulingChange;
    private volatile boolean closed;
    private volatile boolean shuttingDown;
    private boolean boundRemotely;
    private QuartzSchedulerMBean jmxBean;
    private Date initialStart;
    private final Timer updateTimer;
}

//job存储器
public class RAMJobStore
    implements JobStore
{
//暂停触发器
 public void pauseTrigger(TriggerKey triggerKey)
    {
        TriggerWrapper tw;
label0:
        {
	    //获取锁
            synchronized(lock)
            {
                tw = (TriggerWrapper)triggersByKey.get(triggerKey);
                if(tw != null && tw.trigger != null)
                    break label0;
            }
            return;
        }
        if(tw.state != 3)
            break MISSING_BLOCK_LABEL_44;
        obj;
        JVM INSTR monitorexit ;
        return;
	//如果处于阻塞状态,则triggerKey为暂停阻塞,否则置为暂停状态
        if(tw.state == 5)
            tw.state = 6;
        else
            tw.state = 4;
	//并从timeTriggers移除
        timeTriggers.remove(tw);
        obj;
        JVM INSTR monitorexit ;
          goto _L1
        exception;
        throw exception;
_L1:
    }
     public void resumeTrigger(TriggerKey triggerKey)
    {
        TriggerWrapper tw;
label0:
        {
            synchronized(lock)
            {
                tw = (TriggerWrapper)triggersByKey.get(triggerKey);
                if(tw != null && tw.trigger != null)
                    break label0;
            }
            return;
        }
        OperableTrigger trig;
        trig = tw.getTrigger();
        if(tw.state == 4 || tw.state == 6)
            break MISSING_BLOCK_LABEL_59;
        obj;
        JVM INSTR monitorexit ;
        return;
	//如果blockedJobs包含取法任务,则则triggerKey为阻塞,否则置为等待状态
        if(blockedJobs.contains(trig.getJobKey()))
            tw.state = 5;
        else
            tw.state = 0;
        applyMisfire(tw);
	//如果调度器状态为等待,则加入到调度容器中
        if(tw.state == 0)
            timeTriggers.add(tw);
        obj;
        JVM INSTR monitorexit ;
          goto _L1
        exception;
        throw exception;
_L1:
    }
    //暂定job
     public void pauseJob(JobKey jobKey)
    {
        synchronized(lock)
        {
	    //获取触发任务
            List triggersOfJob = getTriggersForJob(jobKey);
            OperableTrigger trigger;
	    //关键调用pauseTrigger(TriggerKey triggerKey)
            for(Iterator i$ = triggersOfJob.iterator(); i$.hasNext(); pauseTrigger(trigger.getKey()))
                trigger = (OperableTrigger)i$.next();

        }
    }
    //恢复job
    public void resumeJob(JobKey jobKey)
    {
        synchronized(lock)
        {   
	    //获取触发任务
            List triggersOfJob = getTriggersForJob(jobKey);
            OperableTrigger trigger;
	    //调resumeTrigge()
            for(Iterator i$ = triggersOfJob.iterator(); i$.hasNext(); resumeTrigger(trigger.getKey()))
                trigger = (OperableTrigger)i$.next();

        }
    }
    //获取jobkey的触发任务
    public List getTriggersForJob(JobKey jobKey)
    {
        ArrayList trigList = new ArrayList();
        synchronized(lock)
        {
            Iterator i$ = triggers.iterator();
            do
            {
                if(!i$.hasNext())
                    break;
                TriggerWrapper tw = (TriggerWrapper)i$.next();
                if(tw.jobKey.equals(jobKey))
                    trigList.add((OperableTrigger)tw.trigger.clone());
            } while(true);
        }
        return trigList;
    }

    protected HashMap jobsByKey;//HashMap<JobKey,JobWrapper>  
    protected HashMap triggersByKey;//HashMap<TrrigerKey,TriggerWrapper>  
    protected HashMap jobsByGroup;//HashMap<String,HashMap<JobKey,JobWrapper>>,,key为JobKey.group  
    protected HashMap triggersByGroup;//HashMap<String,HashMap<TrrigerKey,TriggerWrapper>>,,key为TrrigerKey.group  
    protected TreeSet timeTriggers;//TreeSet<TrrigerWrapper> 红黑树  
    protected HashMap calendarsByName;  
    protected ArrayList triggers; //List<TriggerWrapper>  
    protected final Object lock = new Object();  
    protected HashSet pausedTriggerGroups;  
    protected HashSet pausedJobGroups;  
    protected HashSet blockedJobs;  
    protected long misfireThreshold;  
    protected SchedulerSignaler signaler;  
    private final Logger log = LoggerFactory.getLogger(getClass());  
    private static final AtomicLong ftrCtr = new AtomicLong(System.currentTimeMillis());  
}  

//TriggerKey,JobKey包装类
class TriggerWrapper
{

    TriggerWrapper(OperableTrigger trigger)
    {
        state = 0;
        if(trigger == null)
        {
            throw new IllegalArgumentException("Trigger cannot be null!");
        } else
        {
            this.trigger = trigger;
            key = trigger.getKey();
            jobKey = trigger.getJobKey();
            return;
        }
    }

    public boolean equals(Object obj)
    {
        if(obj instanceof TriggerWrapper)
        {
            TriggerWrapper tw = (TriggerWrapper)obj;
            if(tw.key.equals(key))
                return true;
        }
        return false;
    }

    public int hashCode()
    {
        return key.hashCode();
    }

    public OperableTrigger getTrigger()
    {
        return trigger;
    }

    public final TriggerKey key;
    public final JobKey jobKey;
    public final OperableTrigger trigger;
    public int state;
    public static final int STATE_WAITING = 0;//等待
    public static final int STATE_ACQUIRED = 1;//就绪
    public static final int STATE_EXECUTING = 2;//执行
    public static final int STATE_COMPLETE = 3;//完成
    public static final int STATE_PAUSED = 4;//暂停
    public static final int STATE_BLOCKED = 5;//阻塞
    public static final int STATE_PAUSED_BLOCKED = 6;//暂停阻塞
    public static final int STATE_ERROR = 7;//错误
}

总结:
调度器暂停和恢复trriger,是通过改变triggersByKey中TriggerWrapper的TrrgerKey为trrigerKey的状态,暂定和恢复job是通过暂停和恢复triggers容器中JobKey为jobKey的TriggerWrapper。
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics