`
Donald_Draper
  • 浏览: 945696 次
社区版块
存档分类
最新评论

netty 单线程事件执行器执行任务与graceful方式关闭

阅读更多
netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
netty 多线程事件循环组:http://donald-draper.iteye.com/blog/2391276
netty 抽象调度事件执行器:http://donald-draper.iteye.com/blog/2391379
netty 单线程事件执行器初始化:http://donald-draper.iteye.com/blog/2391895

引言:
上一篇文章我们看一单线程任务的内部变量和初始化,先来回顾一下:
    单线程事件执行器SingleThreadEventExecutor,内部主要有一个状态变量STATE_UPDATER(AtomicIntegerFieldUpdater),执行器状态以供有4中就绪,开始,正在关闭,已关闭,终止;一个任务队列taskQueue存放待执行的任务线程;一个执行器执行任务taskQueue(LinkedBlockingQueue);一个事件执行器关闭信号量threadLock控制事件执行器的关闭;一个是高可见线程thread,指定当前事件执行器线程,用于判断IO操作线程是否在当前事件循环中;
    单线程事件执行器构造,主要是初始化父事件执行器,最大任务数,事件执行器,任务队列和任务拒绝策略,默认拒绝策略为直接抛出拒绝执行器异常。由于单线程事件执行器为顺序执行器OrderedEventExecutor,其主要通过taskQueue为LinkedBlockQueue保证任务的顺序执行。

今天我们来看单线程事件执行器,执行任务等方法:
先把单线程事件执行器内部变量贴出来以便理解相关方法,
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    //最大执行任数,最小为16
    static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));

    private static final InternalLogger logger =
            InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
    //事件执行器状态
    private static final int ST_NOT_STARTED = 1;//就绪
    private static final int ST_STARTED = 2;//开始
    private static final int ST_SHUTTING_DOWN = 3;//正在关闭
    private static final int ST_SHUTDOWN = 4;//已关闭
    private static final int ST_TERMINATED = 5;//终止
    //唤醒任务
    private static final Runnable WAKEUP_TASK = new Runnable() {
        @Override
        public void run() {
            // Do nothing.
        }
    };
    //空任务
    private static final Runnable NOOP_TASK = new Runnable() {
        @Override
        public void run() {
            // Do nothing.
        }
    };
    //事件执行器状态
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
    //线程属性
    private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(
                    SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
   //任务队列
    private final Queue<Runnable> taskQueue;
    private volatile Thread thread;//当前事件执行器线程
    @SuppressWarnings("unused")//线程属性
    private volatile ThreadProperties threadProperties;
    private final Executor executor;//内部执行器
    private volatile boolean interrupted;//是否中断
    private final Semaphore threadLock = new Semaphore(0);//事件执行器终止信号量
    private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();//关闭Hooks任务
    private final boolean addTaskWakesUp;
    private final int maxPendingTasks;//最大执行器任务
    private final RejectedExecutionHandler rejectedExecutionHandler;//任务拒绝策略
    private long lastExecutionTime;//上次执行器时间
    @SuppressWarnings({ "FieldMayBeFinal", "unused" })
    private volatile int state = ST_NOT_STARTED;//执行器初始状态

    private volatile long gracefulShutdownQuietPeriod;//关闭间隔QuietPeriod
    private volatile long gracefulShutdownTimeout;//关闭超时时间
    private long gracefulShutdownStartTime;//关闭开始时间
    //终止异步任务结果
    private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
}

下面先来看执行任务方法
 @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
       //判断线程是否在当前事务循环中
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
	    //如果在事务循环中,则添加任务到任务队列
            addTask(task);
        } else {
	    //否则,当前事件执行器线程
            startThread();
	    //添加任务到任务队列
            addTask(task);
	    //如果事件执行器,已关闭,则移除任务,抛出拒绝执行异常
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
        //如果添加任务不唤醒事件循环且执行任务唤醒事件循环,则唤醒事件循环
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
}

上面方法有一下几点要看:
1.
//判断线程是否在当前事务循环中
boolean inEventLoop = inEventLoop();


2.
if (inEventLoop) {
    //如果在事务循环中,则添加任务到任务队列
    addTask(task);
}

3.
else {
    //否则,当前事件执行器线程
    startThread();
    //添加任务到任务队列
    addTask(task);
    //如果事件执行器,已关闭,则移除任务,抛出拒绝执行异常
    if (isShutdown() && removeTask(task)) {
        reject();
    }
}


4.
//如果添加任务不唤醒事件循环且执行任务唤醒事件循环,则唤醒事件循环
 if (!addTaskWakesUp && wakesUpForTask(task)) {
     wakeup(inEventLoop);
 }


下面分别来看这几点

1.
//判断线程是否在当前事务循环中
boolean inEventLoop = inEventLoop();

//AbstractEventExecutor
@Override
  public boolean inEventLoop() {
      return inEventLoop(Thread.currentThread());
  }

//SingleThreadEventExecutor
@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}


2.
if (inEventLoop) {
    //如果在事务循环中,则添加任务到任务队列
    addTask(task);
}

/**
 * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
 * before.
 在单线程事件执行器关闭前,添加任务到任务队列,或者抛出拒绝执行任务异常
 */
protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    //如果添加任务到队列失败,则抛出抛出拒绝执行任务异常
    if (!offerTask(task)) {
        reject(task);
    }
}
final boolean offerTask(Runnable task) {
    //如果关闭,则拒绝执行器任务,否则添加任务到任务队列
    if (isShutdown()) {
        reject();
    }
    return taskQueue.offer(task);
}
/**
  * Offers the task to the associated {@link RejectedExecutionHandler}.
  将拒绝执行的任务委托给拒绝执行任务Handler处理
  * @param task to reject.
  */
 protected final void reject(Runnable task) {
     rejectedExecutionHandler.rejected(task, this);
 }
//直接抛出拒绝执行任务异常
protected static void reject() {
        throw new RejectedExecutionException("event executor terminated");
}

从这一点来看,当执行器关闭时,直接直接抛出拒绝执行任务异常,如果没关闭,
则将拒绝执行的任务委托给拒绝执行任务Handler处理。
3.
else {
    //否则,当前事件执行器线程
    startThread();
    //添加任务到任务队列
    addTask(task);
    //如果事件执行器,已关闭,则移除任务,抛出拒绝执行异常
    if (isShutdown() && removeTask(task)) {
        reject();
    }
}

//启动事件执行器线程
private void startThread() {
    if (state == ST_NOT_STARTED) {
        //更新执行器状态为已启动,并完成实际启动工作
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            doStartThread();
        }
    }
}
//完成实际启动工作
private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
	   //初始化事件执行器线程,用于判断线程是否在当前事件循环中
            thread = Thread.currentThread();
            if (interrupted) {
	        //如果执行器线程中断,消除中断位
                thread.interrupt();
            }
            boolean success = false;
	    //更新执行器上次执行器事件
            updateLastExecutionTime();
            try {
	        //启动事件执行器线程
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
		    //自旋,等待执行器关闭,并更新执行器状态为已关闭
                    int oldState = state;
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }

                // Check if confirmShutdown() was called at the end of the loop.
		//检查confirmShutdown方法是否在循环最后调用
                if (success && gracefulShutdownStartTime == 0) {
                    logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                            SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                            "before run() implementation terminates.");
                }

                try {
                    // Run all remaining tasks and shutdown hooks.
                    for (;;) {
		        /确定执行器关闭
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
		        //完成清理工作
                        cleanup();
                    } finally {
		        //更新执行器状态为已终止
                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
			//释放线程锁
                        threadLock.release();
			//如果任务线程队列不为空,则警告
                        if (!taskQueue.isEmpty()) {
                            logger.warn(
                                    "An event executor terminated with " +
                                            "non-empty task queue (" + taskQueue.size() + ')');
                        }
                        //设置终止任务结果为成功
                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });
}

在实际完成事件执行器启动工作方法中,事件完成工作通过一个线程完成,并有内部执行器执行;线程事件工作在一个try,finally语句块中,在finally语句块中自旋等待执行器关闭,并完成关闭任务。
来看上述方法需要关注的几点:
3.1
//更新执行器上次执行器事件
updateLastExecutionTime();

/**
 * Updates the internal timestamp that tells when a submitted task was executed most recently.
 * {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
 * usually no need to call this method.  However, if you take the tasks manually using {@link #takeTask()} or
 * {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
 * checks.
 */
protected void updateLastExecutionTime() {
    lastExecutionTime = ScheduledFutureTask.nanoTime();
}


3.2
//启动事件执行器线程
SingleThreadEventExecutor.this.run();

/**
 *待子类实现
 */
protected abstract void run();


3.3
/确定执行器关闭
if (confirmShutdown()) {
    break;
}

/**
 * Confirm that the shutdown if the instance should be done now!
 确认事件执行器关闭
 */
protected boolean confirmShutdown() {
    //如果没关闭,返回false
    if (!isShuttingDown()) {
        return false;
    }
    //如果线程不在当前事件循环中,抛出非法状态异常
    if (!inEventLoop()) {
        throw new IllegalStateException("must be invoked from an event loop");
    }
    //取消调度任务
    cancelScheduledTasks();
    //更新关闭事件
    if (gracefulShutdownStartTime == 0) {
        gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
    }
    //执行所有任务或关闭Hooks线程成功
    if (runAllTasks() || runShutdownHooks()) {
        if (isShutdown()) {
	    //已关闭
            // Executor shut down - no new tasks anymore.
            return true;
        }

        // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
        // terminate if the quiet period is 0.
        // See https://github.com/netty/netty/issues/4241
        if (gracefulShutdownQuietPeriod == 0) {
            return true;
        }
        wakeup(true);
        return false;
    }
    //获取调度任务当前事件
    final long nanoTime = ScheduledFutureTask.nanoTime();
    //如果已经关闭,或关闭超时,返回true
    if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
        return true;
    }
    //如果关闭间隔时间没过完
    if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
        // Check if any tasks were added to the queue every 100ms.
        // TODO: Change the behavior of takeTask() so that it returns on timeout.
	//则继续执行提交的任务
        wakeup(true);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // Ignore
        }

        return false;
    }

    // No tasks were added for last quiet period - hopefully safe to shut down.
    // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
    return true;
}

来看取消调度任务
//取消调度任务
cancelScheduledTasks();
这个方法,在前面文章中已说,简单看下就行
 /**
  * Cancel all scheduled tasks.
  *
  * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
  */
 protected void cancelScheduledTasks() {
     assert inEventLoop();
     Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
     if (isNullOrEmpty(scheduledTaskQueue)) {
         return;
     }

     final ScheduledFutureTask<?>[] scheduledTasks =
             scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]);

     for (ScheduledFutureTask<?> task: scheduledTasks) {
         task.cancelWithoutRemove(false);
     }

     scheduledTaskQueue.clear();
 }


再来看执行所有任务或关闭Hooks线程
//执行所有任务或关闭Hooks线程成功
if (runAllTasks() || runShutdownHooks()) {
    if (isShutdown()) {
	    //已关闭
        // Executor shut down - no new tasks anymore.
        return true;
    }

    // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
    // terminate if the quiet period is 0.
    // See https://github.com/netty/netty/issues/4241
    if (gracefulShutdownQuietPeriod == 0) {
        return true;
    }
    wakeup(true);
    return false;
}
/**
 * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
 *从任务队列take所有任务,并执行器
 * @return {@code true} if and only if at least one task was run
 */
protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {
        //从调度任务队列抓取任务,添加到任务队列中
        fetchedAll = fetchFromScheduledTaskQueue();
        //从任务队列take任务,并执行
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

    if (ranAtLeastOne) {
        //更新最后执行时间
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    //完成运行所有任务后的工作
    afterRunningAllTasks();
    return ranAtLeastOne;
}
//从调度任务队列抓取任务,添加到任务队列中
private boolean fetchFromScheduledTaskQueue() {
     long nanoTime = AbstractScheduledEventExecutor.nanoTime();
     Runnable scheduledTask  = pollScheduledTask(nanoTime);
     while (scheduledTask != null) {
         if (!taskQueue.offer(scheduledTask)) {
             // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
             scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
             return false;
         }
         scheduledTask  = pollScheduledTask(nanoTime);
     }
     return true;
 }
 /**
 * Runs all tasks from the passed {@code taskQueue}.
 *从任务队列take任务,并执行
 * @param taskQueue To poll and execute all tasks.
 *
 * @return {@code true} if at least one task was executed.
 */
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    for (;;) {
        safeExecute(task);
        task = pollTaskFrom(taskQueue);
        if (task == null) {
            return true;
        }
    }
}
//从任务队列拉取任务
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
    for (;;) {
        Runnable task = taskQueue.poll();
        if (task == WAKEUP_TASK) {
	    //如果是唤醒任务,则跳过
            continue;
        }
        return task;
    }
}
//AbstractEventExecutor
/**
  * Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.
  */
 protected static void safeExecute(Runnable task) {
     try {
         task.run();
     } catch (Throwable t) {
         logger.warn("A task raised an exception. Task: {}", task, t);
     }
 }

/**
 * Invoked before returning from {@link #runAllTasks()} and {@link #runAllTasks(long)}.
 待子类实现
 */
@UnstableApi
protected void afterRunningAllTasks() { }


在来看运行关闭Hook线程:
private boolean runShutdownHooks() {
    boolean ran = false;
    // Note shutdown hooks can add / remove shutdown hooks.
    while (!shutdownHooks.isEmpty()) {
        //如果关闭hook线程集合不为空,执行hook线程。
        List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
        shutdownHooks.clear();

        for (Runnable task: copy) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("Shutdown hook raised an exception.", t);
            } finally {
                ran = true;
            }
        }
    }
    if (ran) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    return ran;
}

3.4
//完成清理工作
cleanup();

/**
 * Do nothing, sub-classes may override
 待子类实现
 */
protected void cleanup() {
    // NOOP
}


4.
//如果添加任务不唤醒事件循环且执行任务唤醒事件循环,则唤醒事件循环
 if (!addTaskWakesUp && wakesUpForTask(task)) {
     wakeup(inEventLoop);
 }

@SuppressWarnings("unused")
protected boolean wakesUpForTask(Runnable task) {
    return true;
}
//添加唤醒线程到任务队列
protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop || state == ST_SHUTTING_DOWN) {
        // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
        // is already something in the queue.
        taskQueue.offer(WAKEUP_TASK);
    }
}

从上面可看出,单线程事件执行器,执行任务,首先判断任务是否为null,为空抛出空指针异常,否则,判断线程是否在当前事件循环中,在则添加任务到任务队列,否则开启当前单线程事件执行器,并添加任务到任务队列,如果此时事件执行器已关闭,并可以移除任务,则抛出拒绝执行器任务异常;如果需要启动事件执行器唤醒线程,则添加唤醒线程到任务队列。

再来看其他方法:
/**
 * Interrupt the current running {@link Thread}.
 中断时间执行器线程
 */
protected void interruptThread() {
    Thread currentThread = thread;
    if (currentThread == null) {
        interrupted = true;
    } else {
        currentThread.interrupt();
    }
}
//从任务队列拉取任务
/**
 * @see Queue#poll()
 */
protected Runnable pollTask() {
    assert inEventLoop();
    return pollTaskFrom(taskQueue);
}

protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
    for (;;) {
        Runnable task = taskQueue.poll();
        if (task == WAKEUP_TASK) {
            continue;
        }
        return task;
    }
}
//移除任务
/**
 * @see Queue#remove(Object)
 */
protected boolean removeTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    return taskQueue.remove(task);
}
//检查队头任务
 /**
 * @see Queue#peek()
 */
protected Runnable peekTask() {
    assert inEventLoop();
    return taskQueue.peek();
}
//判断任务队列是否还有任务
/**
 * @see Queue#isEmpty()
 */
protected boolean hasTasks() {
    assert inEventLoop();
    return !taskQueue.isEmpty();
}

/**
 * Return the number of tasks that are pending for processing.
 *获取任务队列当前任务数
 * [b]Be aware that this operation may be expensive as it depends on the internal implementation of the
 * SingleThreadEventExecutor. So use it was care![/b]
 */
public int pendingTasks() {
    return taskQueue.size();
}
//添加hook线程到,关闭hook线程集合
/**
 * Add a {@link Runnable} which will be executed on shutdown of this instance
 */
public void addShutdownHook(final Runnable task) {
    if (inEventLoop()) {
        shutdownHooks.add(task);
    } else {
        execute(new Runnable() {
            @Override
            public void run() {
                shutdownHooks.add(task);
            }
        });
    }
}
//从关闭hook线程集合中,移除hook线程
/**
 * Remove a previous added {@link Runnable} as a shutdown hook
 */
public void removeShutdownHook(final Runnable task) {
    if (inEventLoop()) {
        shutdownHooks.remove(task);
    } else {
        execute(new Runnable() {
            @Override
            public void run() {
                shutdownHooks.remove(task);
            }
        });
    }
}
//返回下一个调度任务的延时时间
/**
 * Returns the amount of time left until the scheduled task with the closest dead line is executed.
 */
protected long delayNanos(long currentTimeNanos) {
    ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
    if (scheduledTask == null) {
        return SCHEDULE_PURGE_INTERVAL;
    }

    return scheduledTask.delayNanos(currentTimeNanos);
}

从上面可以看出,添加,移除,poll任务操作,实际委托给任务队列,
添加,移除hook线程操作委托给关闭hooks线程集合。
来看从任务队列take任务
/**
 * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
 * 
 * Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
 * created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
 * 

 *
 * @return {@code null} if the executor thread has been interrupted or waken up.
 */
protected Runnable takeTask() {
    assert inEventLoop();
    if (!(taskQueue instanceof BlockingQueue)) {
        throw new UnsupportedOperationException();
    }
    //获取当前任务队列
    BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
    for (;;) {
       //从调度任务队列peek头部调度任务
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        if (scheduledTask == null) {
            Runnable task = null;
            try {
	        //如果调度任务为空,则从任务队列take一个任务
                task = taskQueue.take();
                if (task == WAKEUP_TASK) {
                    task = null;
                }
            } catch (InterruptedException e) {
                // Ignore
            }
            return task;
        } else {
	    //否则,获取调度任务延时时间
            long delayNanos = scheduledTask.delayNanos();
            Runnable task = null;
            if (delayNanos > 0) {
                try {
		    //如果延时时间大于0,则从任务队列超时poll任务
                    task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
                } catch (InterruptedException e) {
                    // Waken up.
                    return null;
                }
            }
            if (task == null) {
                // We need to fetch the scheduled tasks now as otherwise there may be a chance that
                // scheduled tasks are never executed if there is always one task in the taskQueue.
                // This is for example true for the read task of OIO Transport
                // See https://github.com/netty/netty/issues/1614
		//从调度任务队列抓取调度任务,添加到任务队列
                fetchFromScheduledTaskQueue();
		//从任务队列poll任务
                task = taskQueue.poll();
            }

            if (task != null) {
                return task;
            }
        }
    }
}

从上面可以看出,单线程事件执行器take任务,首先从调度任务队列peek头部调度任务,
如果任务不为空,则获取调度任务延时时间,如果延时时间大于0,则从任务队列超时poll任务,否则从调度任务队列抓取调度任务,添加到任务队列,并从任务队列poll任务;如果调度任务为空,则从任务队列take一个任务,如果是唤醒任务,则忽略。

再来看超时运行所有任务:

 /**
 * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
 * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
 超时运行所有任务,即从任务队列拉取任务,并执行,如果任务执行时间超过timeoutNanos,则停止执行任务
 */
protected boolean runAllTasks(long timeoutNanos) {
    //从调度任务抓取任务队列,并添加到任务队列
    fetchFromScheduledTaskQueue();
    //从任务队列poll任务
    Runnable task = pollTask();
    if (task == null) {
        //任务为空,则完成执行结束任务
        afterRunningAllTasks();
        return false;
    }
    //超时时间
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        //自旋安全执行任务
        safeExecute(task);

        runTasks ++;

        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
        // XXX: Hard-coded value - will make it configurable if it is really a problem.
	//没64个任务检查一下任务执行时间有没有超时
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

从上面可以看出,超时运行所有任务,即从调度任务队列,抓取任务,方法任务队列,
从任务队列拉取任务,并执行,如果任务执行时间超过timeoutNanos,则停止执行任务。

来看关闭单线程执行器
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
    //检查间隔、超时时间,时间单元参数,并且间隔时间要小于超时时间
    if (quietPeriod < 0) {
        throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
    }
    if (timeout < quietPeriod) {
        throw new IllegalArgumentException(
                "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }
    //如果已经关闭,则返回异步关闭任务结果
    if (isShuttingDown()) {
        return terminationFuture();
    }
    boolean inEventLoop = inEventLoop();
    boolean wakeup;
    int oldState;
    for (;;) {
         //如果已经关闭,则返回异步关闭任务结果
        if (isShuttingDown()) {
            return terminationFuture();
        }
        int newState;
        wakeup = true;
        oldState = state;
	//如果线程在当前事务循环,则更新状态为正在关闭
        if (inEventLoop) {
            newState = ST_SHUTTING_DOWN;
        } else {
            switch (oldState) {
                case ST_NOT_STARTED:
                case ST_STARTED:
                    newState = ST_SHUTTING_DOWN;
                    break;
                default:
                    newState = oldState;
                    wakeup = false;
            }
        }
	//更新状态
        if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
            break;
        }
    }
    //计算关闭间隔和超时时间,这个会在执行任务方法中的finally语句块中,用到
    gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
    gracefulShutdownTimeout = unit.toNanos(timeout);
    if (oldState == ST_NOT_STARTED) {
        doStartThread();
    }

    if (wakeup) {
        //执行唤醒任务线程
        wakeup(inEventLoop);
    }

    return terminationFuture();
}

从上面可以看出,关闭单线程执行器,首先检查间隔、超时时间,时间单元参数,并且间隔时间要小于超时时间,
如果已经关闭,则返回异步关闭任务结果,否则检查线程是否在当前事务循环中,如果是则更新状态为正在关闭,
并计算计算关闭间隔和超时时间。

来看已经丢弃的关闭方法
 @Override
 @Deprecated
 public void shutdown() {
     if (isShutdown()) {
         return;
     }

     boolean inEventLoop = inEventLoop();
     boolean wakeup;
     int oldState;
     for (;;) {
         if (isShuttingDown()) {
             return;
         }
         int newState;
         wakeup = true;
         oldState = state;
         if (inEventLoop) {
             newState = ST_SHUTDOWN;
         } else {
             switch (oldState) {
                 case ST_NOT_STARTED:
                 case ST_STARTED:
                 case ST_SHUTTING_DOWN:
                     newState = ST_SHUTDOWN;
                     break;
                 default:
                     newState = oldState;
                     wakeup = false;
             }
         }
         if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
             break;
         }
     }

     if (oldState == ST_NOT_STARTED) {
         doStartThread();
     }

     if (wakeup) {
         wakeup(inEventLoop);
     }
 }


shutdown与shutdownGracefully方法的不同在于,关闭执行器时,shutdownGracefully有一个缓冲的时间和任务执行的超时时间,以便将任务队列中的任务尽量在超时时间内执行完。

@Override
 public boolean isShuttingDown() {
     return state >= ST_SHUTTING_DOWN;
 }

 @Override
 public boolean isShutdown() {
     return state >= ST_SHUTDOWN;
 }

 @Override
 public boolean isTerminated() {
     return state == ST_TERMINATED;
 }
@Override
 public Future<?> terminationFuture() {
     return terminationFuture;
 }

再来看超时等待终止执行器
@Override
 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
     if (unit == null) {
         throw new NullPointerException("unit");
     }

     if (inEventLoop()) {
         throw new IllegalStateException("cannot await termination of the current thread");
     }
     //尝试获取线程锁,这个锁在执行器终止时,释放。
     if (threadLock.tryAcquire(timeout, unit)) {
         threadLock.release();
     }

     return isTerminated();
 }

再来看获取执行器线程属性:
/**
  * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}.
  * If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until the
  * it is fully started.
  返回执行器线程的属性,如果执行器没启动,则阻塞到其启动
  */
 public final ThreadProperties threadProperties() {
     ThreadProperties threadProperties = this.threadProperties;
     if (threadProperties == null) {
         Thread thread = this.thread;
         if (thread == null) {
             assert !inEventLoop();
             submit(NOOP_TASK).syncUninterruptibly();
             thread = this.thread;
             assert thread != null;
         }
	 //构造线程属性,并更新
         threadProperties = new DefaultThreadProperties(thread);
         if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
             threadProperties = this.threadProperties;
         }
     }

     return threadProperties;
 }

再来看其他方法:
下面几个方法的思想都是首先检查线程是否在当前事件循环中,如果不在,则抛出拒绝执行器异常,否则将相应的操作委托给父类。
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
    throwIfInEventLoop("invokeAny");
    return super.invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    throwIfInEventLoop("invokeAny");
    return super.invokeAny(tasks, timeout, unit);
}

@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    throwIfInEventLoop("invokeAll");
    return super.invokeAll(tasks);
}

@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
    throwIfInEventLoop("invokeAll");
    return super.invokeAll(tasks, timeout, unit);
}
//如果不在当前事件循环中,则抛出拒绝执行器异常
private void throwIfInEventLoop(String method) {
    if (inEventLoop()) {
        throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
    }
}


总结:


      单线程事件执行器,执行任务,首先判断任务是否为null,为空抛出空指针异常,否则,判断线程是否在当前事件循环中,在则添加任务到任务队列,否则开启当前单线程事件执行器,并添加任务到任务队列,如果此时事件执行器已关闭,并可以移除任务,则抛出拒绝执行器任务异常;如果需要启动事件执行器唤醒线程,则添加唤醒线程到任务队列。
      添加,移除,poll任务操作,实际委托给任务队列,添加,移除hook线程操作委托给关闭hooks线程集合。
      单线程事件执行器take任务,首先从调度任务队列peek头部调度任务,如果任务不为空,则获取调度任务延时时间,如果延时时间大于0,则从任务队列超时poll任务,否则从调度任务队列抓取调度任务,添加到任务队列,并从任务队列poll任务;如果调度任务为空,则从任务队列take一个任务,如果是唤醒任务,则忽略。
      关闭单线程执行器,首先检查间隔、超时时间,时间单元参数,并且间隔时间要小于超时时间,如果已经关闭,则返回异步关闭任务结果,否则检查线程是否在当前事务循环中,如果是则更新状态为正在关闭,并计算计算关闭间隔和超时时间。


0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics