`
Donald_Draper
  • 浏览: 950319 次
社区版块
存档分类
最新评论

ThreadPoolExecutor解析四(线程池关闭)

    博客分类:
  • JUC
阅读更多
Executor接口的定义:http://donald-draper.iteye.com/blog/2365625
ExecutorService接口定义:http://donald-draper.iteye.com/blog/2365738
Future接口定义:http://donald-draper.iteye.com/blog/2365798
FutureTask解析:http://donald-draper.iteye.com/blog/2365980
CompletionService接口定义:http://donald-draper.iteye.com/blog/2366239
ExecutorCompletionService解析:http://donald-draper.iteye.com/blog/2366254
AbstractExecutorService解析:http://donald-draper.iteye.com/blog/2366348
ScheduledExecutorService接口定义:http://donald-draper.iteye.com/blog/2366436
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等) :
http://donald-draper.iteye.com/blog/2366934
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等):
http://donald-draper.iteye.com/blog/2367064
ThreadPoolExecutor解析三(线程池执行提交任务):
http://donald-draper.iteye.com/blog/2367199
上一篇看了线程池执行提交任务,先回顾一下:
执行任务的过程为,如果工作线程数量小于核心线程池数量,添加工作线程,执行任务;如果添加工作线程失败,则添加任务到队列,并进行双检查,如果在上述期间,线程池关闭,回滚任务,从队列中移除任务;如果任务入队列失败,根据工作线程数量是否大于最大线程池数量,来判断是否应该添加工作线程执行任务;如果工作线程小于最大线程池数量,则CAS操作workCount,成功创建工作线程执行任务。添加工作线程的过程为,如果应该添加工作线程,则CAS更新工作线程数,如果更新成功,则创建工作线程,执行任务。如果线程是已关闭或正在关闭,则添加工作线程失败。如果线程工厂创建线程失败,则返回false,如果由于线程工厂返回null或OutOfMemoryError等原因,执行回滚清除工作。回滚清除工作主要是工作线程和工作线程数。最后检查线程是是否关闭,如果线程池正在运行,或正在关闭且队列不为空,则直接返回,否则及线程池已关闭,检查工作线程是否为0,不为零根据ONLY_ONE判断中断一个空闲线程还是多个。
今天来看一下线程池的关闭:
/**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     先前提交的任务将会被工作线程执行,新的线程将会被拒绝。这个方法
     不会等待提交的任务执行完,我们可以用awaitTermination来等待任务执行完。
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
	    //检查线程访问权限
            checkShutdownAccess();
	    //更新线程池状态为SHUTDOWN
            advanceRunState(SHUTDOWN);
	    //中断空闲工作线程
            interruptIdleWorkers();
	    //线程池关闭hook
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
	//尝试结束线程池,这个前面以说,这里不再说
        tryTerminate();
    }

分四步来看:
1.
//检查线程访问权限
checkShutdownAccess();

2.
//更新线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);

3.
//中断空闲工作线程
interruptIdleWorkers();

4.
//线程池关闭hook
onShutdown(); // hook for ScheduledThreadPoolExecutor

下面来看每一点:
1.
//检查线程访问权限
checkShutdownAccess();

 /**
     * If there is a security manager, makes sure caller has
     * permission to shut down threads in general (see shutdownPerm).
     * If this passes, additionally makes sure the caller is allowed
     * to interrupt each worker thread. This might not be true even if
     * first check passed, if the SecurityManager treats some threads
     * specially.
     */
    private void checkShutdownAccess() {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(shutdownPerm);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
		    //遍历工作线程集,检查任务线程访问权限
                    security.checkAccess(w.thread);
            } finally {
                mainLock.unlock();
            }
        }
    }

2.
//更新线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);

 /**
     * Transitions runState to given target, or leaves it alone if
     * already at least the given target.
     *
     * @param targetState the desired state, either SHUTDOWN or STOP
     *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
     */
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

3.
//中断空闲工作线程
interruptIdleWorkers();


/**
     * Common form of interruptIdleWorkers, to avoid having to
     * remember what the boolean argument means.
     */
    private void interruptIdleWorkers() {
       //遍历工作线程集合,中断空闲工作线程,前面已将,这里不再说
        interruptIdleWorkers(false);
    }

   
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
	        //遍历工作线程集
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {//锁打开说明,工作线程空闲
                    try {
		        //如果工作线程非中断,且空闲,尝试获取锁,获取锁成功,则中断工作线程
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
		    //如果是只中断一个空闲线程,则结束本次中断空闲线程任务
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

//只中断一个空闲工作线程
private static final boolean ONLY_ONE = true;


4.
//线程池关闭hook
onShutdown(); // hook for ScheduledThreadPoolExecutor
   /**
     * Performs any further cleanup following run state transition on
     * invocation of shutdown.  A no-op here, but used by
     * ScheduledThreadPoolExecutor to cancel delayed tasks.
     */
    void onShutdown() {
    //待子类扩展
    }


我们也把tryTerminate代码贴出来,以便理解:
final void tryTerminate() {
        //自旋尝试关闭线程池
        for (;;) {
            int c = ctl.get();
	    //如果线程池正在运行,或正在关闭且任务队列不为空,则返回
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
	        //如果工作线程不为空,则中断空闲工作线程
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
	        //线程池已关闭,任务队列为空,工作线程为0,更新线程池状态为TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
		        //执行结束工作
                        terminated();
                    } finally {
		        //线程池已结束
                        ctl.set(ctlOf(TERMINATED, 0));
			//唤醒等待线程池结束的线程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

//执行结束工作
terminated();

/**
     * Method invoked when the Executor has terminated.  Default
     * implementation does nothing. Note: To properly nest multiple
     * overridings, subclasses should generally invoke
     
     * {@code super.terminated} within this method.
     */
    protected void terminated() { 
    //待子类扩展}

从上面可以看出关闭线程池,首先检查工作线程运行时访问权限,
更新线程状态为SHUTDOWN,中断空闲工作线程,最后尝试关闭线程池。
再看来以及关闭线程池:
/**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     尝试停止正在执行的任务,停止等待任务线程的处理,任务队列将会被排空,
     并返回任务队列中的任务集。
     这个方法不会等待已执行的任务结束,我们用awaitTermination来等待任务执行完
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
	    //检查工作线程权限
            checkShutdownAccess();
	    //更新线程池状态为STOP
            advanceRunState(STOP);
	    //中断空闲工作线程
            interruptWorkers();
            //清空任务队列,并放到tasks集合中
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
	//尝试结束线程池
        tryTerminate();
        return tasks;
    }

来看清空任务队列,并放到tasks集合中
 tasks = drainQueue();


/**
     * Drains the task queue into a new list, normally using
     * drainTo. But if the queue is a DelayQueue or any other kind of
     * queue for which poll or drainTo may fail to remove some
     * elements, it deletes them one by one.
     */
    private List<Runnable> drainQueue() {
        //这个方法很简单,不再说了
        BlockingQueue<Runnable> q = workQueue;
        List<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

立即关闭线程与关闭线程池的不同是,对于关闭线程池,先前提交的任务将会被工作线程执行,新的线程将会被拒绝;而立即关闭线程,尝试停止正在执行的任务,停止等待任务线程的处理,任务队列将会被排空,并返回任务队列中的任务集。这两个方法都不会等待任务执行完或任务结束。
我们可以用awaitTermination来等待任务执行完
public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
	    //自旋等待线程线程结束条件
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

来看一些其他的方法:
/**
     * Invokes {@code shutdown} when this executor is no longer
     * referenced and it has no threads.
     */
    //线程池不在被应用时,关闭线程池
    protected void finalize() {
        shutdown();
    }
//创建一个核心空闲工作线程等待任务
 /**
     * Starts a core thread, causing it to idly wait for work. This
     * overrides the default policy of starting core threads only when
     * new tasks are executed. This method will return {@code false}
     * if all core threads have already been started.
     *
     * @return {@code true} if a thread was started
     */
    public boolean prestartCoreThread() {
        //addWorker,core参数为true使用核心线程池数量,否则最大线程池数量
        return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
    }
//确保至少有一个空闲工作线程等待任务
/**
     * Same as prestartCoreThread except arranges that at least one
     * thread is started even if corePoolSize is 0.
     */
    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }
//创建核心线程池数量的空闲工作线程等待任务
    /**
     * Starts all core threads, causing them to idly wait for work. This
     * overrides the default policy of starting core threads only when
     * new tasks are executed.
     *
     * @return the number of threads started
     */
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }
//尝试移除任务取消的工作线程
 /**
     * Tries to remove from the work queue all {@link Future}
     * tasks that have been cancelled. This method can be useful as a
     * storage reclamation operation, that has no other impact on
     * functionality. Cancelled tasks are never executed, but may
     * accumulate in work queues until worker threads can actively
     * remove them. Invoking this method instead tries to remove them now.
     * However, this method may fail to remove tasks in
     * the presence of interference by other threads.
     */
    public void purge() {
        final BlockingQueue<Runnable> q = workQueue;
        try {
            Iterator<Runnable> it = q.iterator();
            while (it.hasNext()) {
                Runnable r = it.next();
                if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                    it.remove();
            }
        } catch (ConcurrentModificationException fallThrough) {
            // Take slow path if we encounter interference during traversal.
            // Make copy for traversal and call remove for cancelled entries.
            // The slow path is more likely to be O(N*N).
            for (Object r : q.toArray())
                if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                    q.remove(r);
        }

        tryTerminate(); // In case SHUTDOWN and now empty
    }
 //是否关闭,大于运行状态,即为关闭
  public boolean isShutdown() {
        return ! isRunning(ctl.get());
    }
   private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
//线程池关闭,但还没有完全结束
/**
     * Returns true if this executor is in the process of terminating
     * after {@link #shutdown} or {@link #shutdownNow} but has not
     * completely terminated.  This method may be useful for
     * debugging. A return of {@code true} reported a sufficient
     * period after shutdown may indicate that submitted tasks have
     * ignored or suppressed interruption, causing this executor not
     * to properly terminate.
     *
     * @return true if terminating but not yet terminated
     */
    public boolean isTerminating() {
        int c = ctl.get();
        return ! isRunning(c) && runStateLessThan(c, TERMINATED);
    }
 //线程池是否结束
  public boolean isTerminated() {
        return runStateAtLeast(ctl.get(), TERMINATED);
    }
  
//设置工作线程保活时间
   /**
     * Sets the time limit for which threads may remain idle before
     * being terminated.  If there are more than the core number of
     * threads currently in the pool, after waiting this amount of
     * time without processing a task, excess threads will be
     * terminated.  This overrides any value set in the constructor.
     *
     * @param time the time to wait.  A time value of zero will cause
     *        excess threads to terminate immediately after executing tasks.
     * @param unit the time unit of the {@code time} argument
     * @throws IllegalArgumentException if {@code time} less than zero or
     *         if {@code time} is zero and {@code allowsCoreThreadTimeOut}
     * @see #getKeepAliveTime
     */
    public void setKeepAliveTime(long time, TimeUnit unit) {
        if (time < 0)
            throw new IllegalArgumentException();
        if (time == 0 && allowsCoreThreadTimeOut())
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        long keepAliveTime = unit.toNanos(time);
        long delta = keepAliveTime - this.keepAliveTime;
        this.keepAliveTime = keepAliveTime;
        if (delta < 0)
	    //如果保证时间小于当前保证时间,中断空闲工作线程
            interruptIdleWorkers();
    }

//运行工作线程超时等待

/**
     * Sets the policy governing whether core threads may time out and
     * terminate if no tasks arrive within the keep-alive time, being
     * replaced if needed when new tasks arrive. When false, core
     * threads are never terminated due to lack of incoming
     * tasks. When true, the same keep-alive policy applying to
     * non-core threads applies also to core threads. To avoid
     * continual thread replacement, the keep-alive time must be
     * greater than zero when setting {@code true}. This method
     * should in general be called before the pool is actively used.
     *
     * @param value {@code true} if should time out, else {@code false}
     * @throws IllegalArgumentException if value is {@code true}
     *         and the current keep-alive time is not greater than zero
     *
     * @since 1.6
     */
    public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        if (value != allowCoreThreadTimeOut) {
            allowCoreThreadTimeOut = value;
            if (value)
                interruptIdleWorkers();
        }
    }
 
//设置核心线程池数量
  /**
     * Sets the core number of threads.  This overrides any value set
     * in the constructor.  If the new value is smaller than the
     * current value, excess existing threads will be terminated when
     * they next become idle.  If larger, new threads will, if needed,
     * be started to execute any queued tasks.
     *
     * @param corePoolSize the new core size
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @see #getCorePoolSize
     */
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
	    //如果当前工作线程数量大于corePoolSize,中断空闲工作线程
            interruptIdleWorkers();
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            int k = Math.min(delta, workQueue.size());
	    //如果核心线程池数量增大,且工作线程未达到核心线程池数量,则添加等待执行的任务数量
	    //和新增核心线程池数量中最小者数量的空闲工作线程
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }
//设置最大线程池数量
/**
     * Sets the maximum allowed number of threads. This overrides any
     * value set in the constructor. If the new value is smaller than
     * the current value, excess existing threads will be
     * terminated when they next become idle.
     *
     * @param maximumPoolSize the new maximum
     * @throws IllegalArgumentException if the new maximum is
     *         less than or equal to zero, or
     *         less than the {@linkplain #getCorePoolSize core pool size}
     * @see #getMaximumPoolSize
     */
    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
        this.maximumPoolSize = maximumPoolSize;
        if (workerCountOf(ctl.get()) > maximumPoolSize)
	    //如果当前工作线程数量大于最大线程池数量,则中断空闲工作线程
            interruptIdleWorkers();
    }

总结:
关闭线程池,首先检查工作线程运行时访问权限,更新线程状态为SHUTDOWN,中断空闲工作线程,最后尝试关闭线程池。立即关闭线程与关闭线程池的不同是,对于关闭线程池,先前提交的任务将会被工作线程执行,新的线程将会被拒绝;而立即关闭线程,尝试停止正在执行的任务,停止等待任务线程的处理,任务队列将会被排空,并返回任务队列中的任务集。这两个方法都不会等待任务执行完或任务结束。我们可以用awaitTermination来等待任务执行完。
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics