`
Donald_Draper
  • 浏览: 950194 次
社区版块
存档分类
最新评论

netty nio事件循环后续

阅读更多
netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
netty 多线程事件循环组:http://donald-draper.iteye.com/blog/2391276
netty 抽象调度事件执行器:http://donald-draper.iteye.com/blog/2391379
netty 单线程事件执行器初始化:http://donald-draper.iteye.com/blog/2391895
netty 单线程事件执行器执行任务与graceful方式关闭:http://donald-draper.iteye.com/blog/2392051
netty 单线程事件循环:http://donald-draper.iteye.com/blog/2392067
netty nio事件循环初始化:http://donald-draper.iteye.com/blog/2392161
引言:
前面一篇文章我们看了nio事件循环初始化,先来回顾一下:
    Nio事件循环内部有一个取消选择key计数器清理间隔CLEANUP_INTERVAL,用于当取消的选择key达到256个时,重置取消选择key计数器cancelledKeys(int),并重新进行选择操作;选择器自动重构阈值SELECTOR_AUTO_REBUILD_THRESHOLD,默认选择操作发生512次,用于控制当选择器发生多少次选择操作时,重构选择器;选择器状态判断器selectNowSupplier,用于获取Nio事件循环内部选择器的选择操作结果;同时有一个选择器selector,未包装过的选择器unwrappedSelector和一个选择器提供者provider,一个选择key就绪集合selectedKeys(SelectedSelectionKeySet);当选择器的选择操作阻塞时,wakenUp(AtomicBoolean)属性决定是否应该break选择操作过程;一个Nio处理Io事件的时间占比ioRatio(int),默认为50,即IO事件处理时间和其他事件处理时间各占Nio事件循环一半;一个选择策略selectStrategy用于控制选择循环,如果返回结果为-1,则下一步应该阻塞选择操作,如果返回结果为-2,则下一步应该调回IO事件循环,处理IO事件,而不是继续执行选择操作,返回值大于0,表示需要有工作要做,即注册到选择器的选择通道有IO事件就绪。
    Nio事件循环初始化,主要是将Nio事件循环组和事件执行器及任务拒绝策略传给父类单线程事件循环(单线程事件执行器),同时打开一个选择器。
    打开选择器过程,委托给选择器提供者打开一个选择器,如果需要优化选择器的,在当前线程访问控制选择下,加载选择器实现类,不初始化,如果从系统类加载器加载的选择key实现类不是Class实例,或不是裸选择器类型,不进行选择器key集合优化,及选择器为选择器提供者打开的裸选择器;否则在当前线程相同访问控制权限下,获取系统选择器实现类的,选择器就绪key集合selectedKeysField及其代理publicSelectedKeysField,设置选择器就绪key集合selectedKeysField及其代理publicSelectedKeysField访问控制权限,将系统选择器的就绪key集合selectedKeysField及其代理publicSelectedKeysField设值为selectedKeySet(SelectedSelectionKeySet),并将选择器selector包装为SelectedSelectionKeySetSelector。

今天我们接着来看nio事件循环的其他方法:
先把nio事件循环的变量声明贴出来,以便理解其他方法,
/**
 * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
 * {@link Selector} and so does the multi-plexing of these in the event loop.
 Nio单线程事件循环,注册关联通道到同一个选择器,以便复用事件循环。
 *
 */
public final class NioEventLoop extends SingleThreadEventLoop {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
    //取消选择key计数器,清理间隔,当取消的选择key达到256个时,重置计数器,并重新进行选择操作
    private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
    //是否优化选择器key集合,默认为不优化
    private static final boolean DISABLE_KEYSET_OPTIMIZATION =
            SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
    private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;//最小的选择器重构阈值
    private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;//选择器自动重构阈值,默认选择操作发生512次,重构

    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };
    //Nio事件循环任务数量Callable
    private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            return NioEventLoop.super.pendingTasks();
        }
    };
    // Workaround for JDK NIO bug.
    //
    // See:
    // - http://bugs.sun.com/view_bug.do?bug_id=6427854
    // - https://github.com/netty/netty/issues/203
    static {
        //获取java的bug等级
        final String key = "sun.nio.ch.bugLevel";
        final String buglevel = SystemPropertyUtil.get(key);
        if (buglevel == null) {
	    //如果bug等级为空,则在当前任务线程相同访问控制权限下,设置nio的bug等级为空
            try {
                AccessController.doPrivileged(new PrivilegedAction<Void>() {
                    @Override
                    public Void run() {
                        System.setProperty(key, "");
                        return null;
                    }
                });
            } catch (final SecurityException e) {
                logger.debug("Unable to get/set System Property: " + key, e);
            }
        }
       //初始选择器重构阈值
        int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
        if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
	    //不需要重构选择器
            selectorAutoRebuildThreshold = 0;
        }
        SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION);
            logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
        }
    }
    /**
     * The NIO {@link Selector}.
     */
    private Selector selector;//就绪选择key集合优化后的选择器
    private Selector unwrappedSelector;//没有包装过的选择器,即选择器提供者打开的原始选择器
    private SelectedSelectionKeySet selectedKeys;//选择key就绪集合
    private final SelectorProvider provider;//选择器提供者
    /**
     * Boolean that controls determines if a blocked Selector.select should
     * break out of its selection process. In our case we use a timeout for
     * the select method and the select method will block for that time unless
     * waken up.
     当选择器的选择操作阻塞时,wakenUp属性决定是否应该break选择操作过程。在我们的实现中,
     我们给选择操作一个超时时间, 除非选择操作被wakeup,否则选择操作达到超时时间,则break选择操作。
    
     */
    private final AtomicBoolean wakenUp = new AtomicBoolean();
    private final SelectStrategy selectStrategy;//选择策略
    private volatile int ioRatio = 50;//Nio处理Io事件的时间占比,以便可以处理器其他非IO事件
    private int cancelledKeys;//取消选择key计数器
    private boolean needsToSelectAgain;//是否需要重新选择
}

我们来从nio事件循环的启动方法run开启,这个方法的声明在单线程事件执行器中,在启动执行器时,调用,忘记的,可以查阅前面的相关文章:
@Override
protected void run() {
    for (;;) {
        try {
	    //检查选择策略结果
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;//如果结果为CONTINU则,跳出当前这次事件循环,进入下一次事件循环
                case SelectStrategy.SELECT:
		    //选择操作
                    select(wakenUp.getAndSet(false));

                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //在调用选择器唤醒方法,之前,先确定wakenUp的值,以减少唤醒负载,因为
		    //唤醒选择器是一个耗时的操作。
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //但这种捷径有一个竞争条件,当wakenUp属性设值为true过早时,将会触发竞争。
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
		    //以下情况可能引起竞争
		    // 1)选择器在wakenUp属性更新为false和选择操作之间被唤醒(BAD)
                    // 2)选择器在选择操作和获取wakenUp属性之间。(OK)
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //在第一种情况下,当wakenUp属性更新为true,接下来的选择操作就会立刻被唤醒,
		    //直到在下一次循环中wakenUp属性更新为false,wakenUp.compareAndSet(false, true)
		    //将会失败,同时引起下一次不必要的选择操作阻塞。
		    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).
                    //为了修复这个问题,在选择操作后,如果wakenUp属性为true,则唤醒选择器。
		    //在上面两种情况下,这可能不是一个有效的唤醒选择器的方式。
                    if (wakenUp.get()) {
		        //如果需要唤醒选择器,则唤醒
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }
            //每一次选择操作,重置取消选择key计数器
            cancelledKeys = 0;
	    //置是否需要重新选择为false
            needsToSelectAgain = false;
	    //获取当前IO事件处理时间百分比
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
		    //选择器中就绪的选择key集合
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
		    //运行任务队列中的任务,这个在前面的文章已将
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
		    //超时运行任务队列,这个在前面的文章已将
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
	    //如果有异常则处理事件循环异常
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
	    //事件循环正在关闭,则反注册选择器中的选择通道,并关闭。
            if (isShuttingDown()) {
                closeAll();
		//确定事件循环关闭,这个方法在单线程事件执行器中定义
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

从上面可以看出:Nio事件循环启动后,首先选择策略根据选择器结果提供者和任务队列是否有任务生成下一步的操作策略,如果选择操作结果返回为SelectStrategy.CONTINUE,则跳出当前事件循环,如果为SelectStrategy.SELECT,则执行选择操作,并阻塞下一次选择操作,如果需要唤醒选择器,则唤醒;然后重置取消选择key计数器cancelledKeys为0,置是否需要重新选择属性needsToSelectAgain为false,然后处理选择器就绪的选择key,在根据当前IO事件处理时间百分比ioRatio,决定是执行任务队列所有任务还是超时执行任务队列任务,如果ioRatio小于100,则为超时执行任务队列中任务;最后检查事件循环是否正在关闭,是则反注册选择器中的选择通道,并关闭,确定事件循环关闭;上述整个过程在事件循环运行期间,不断地重复。

上述run方法有几点要看
1.
//选择操作
select(wakenUp.getAndSet(false));


2.
//选择器中就绪的选择key集合
processSelectedKeys();


3.
//如果有异常则处理事件循环异常
handleLoopException(t);


4.
//事件循环正在关闭,则反注册选择器中的选择通道,并关闭。
if (isShuttingDown()) {
    closeAll();
	//确定事件循环关闭,这个方法在单线程事件执行器中定义
    if (confirmShutdown()) {
        return;
    }
}

下面我们分别来看这几点:

1.
//选择操作
select(wakenUp.getAndSet(false));


private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;//选择操作计数器
        long currentTimeNanos = System.nanoTime();
	//计算选择操作延时时间
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
	        //如果延时时间已过去0.5毫秒,且选择操作计数器当前为0,第一次执行选择操作
                if (selectCnt == 0) {
		    //执行立刻执行选择操作,更新选择操作计数器
                    selector.selectNow();
                    selectCnt = 1;
                }
		//并跳出当前选择操作过程
                break;
            }

            // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
            // Selector#wakeup. So we need to check task queue again before executing select operation.
            // If we don't, the task might be pended until select operation was timed out.
            // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
	    //如果在wakeUp属性为true时,一个任务提交到事件循环,这个任务将没有机会调用选择器唤醒操作。
	    //所以在执行器选择操作之前,需要检查任务队列是否有任务。如果我们不检查,任务可能直到选择操作超时,
	    //才能被添加到任务队列。如果管道中有空闲状态处理Handler,任务也许会在空闲状态超时的时候,添加到任务队列。
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
	        //如果任务队列中有任务,且wakeUp属性为false,并更新为true成功,则立刻执行选择操作,更新选择操作计数器
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            //执行超时选择操作
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;//更新选择操作计数器
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // - Selected something,
                // - waken up by user, or
                // - the task queue has a pending task.
                // - a scheduled task is ready for processing
		//如果有选择key就绪,或原始wakeUp属性为true,或当前wakeUp属性为true,或任务队列有任务,或调度任务队列有调度任务,
		//则跳出当前选择操作
                break;
            }
            if (Thread.interrupted()) {
                // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                // As this is most likely a bug in the handler of the user or it's client library we will
                // also log it.
		//当线程处于中断状态,重置选择key集合,以免加重事务循环的负载。这种情况是由于用户Handler
		//bug或客户端lib引起的,我们log记录
                //
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis elapsed without anything selected.
		//超时时间已过
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
		//如果选择器重构阈值大于0,且当前选择操作计数器的值大于阈值,则重新构造选择器
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                        "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                        selectCnt, selector);
                //重构选择器
                rebuildSelector();
                selector = this.selector;
                //重新执行选择操作
                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            currentTimeNanos = time;
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
	    //如果选择操作计数器的值,大于最小选择器重构阈值,则输出log
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                        selectCnt - 1, selector);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                    selector, e);
        }
        // Harmless exception - log anyway
    }
}

我们来看重构选择器:
//重构选择器
rebuildSelector();


/**
 * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
 * around the infamous epoll 100% CPU bug.
 在CPU接近100%负载情况下,重新创建一个选择器替代当前事件循环中的选择器
 */
public void rebuildSelector() {
    //如果线程不在当前事件循环中,创建一个线程完成重构任务
    if (!inEventLoop()) {
        execute(new Runnable() {
            @Override
            public void run() {
                rebuildSelector0();
            }
        });
        return;
    }
    //否则,直接重构
    rebuildSelector0();
}
//重新创建一个选择器,替代当前事件循环中的选择器
private void rebuildSelector0() {
    final Selector oldSelector = selector;
    final SelectorTuple newSelectorTuple;

    if (oldSelector == null) {
        原始选择器为空,则直接返回
        return;
    }

    try {
        //重新打开一个选择器
        newSelectorTuple = openSelector();
    } catch (Exception e) {
        logger.warn("Failed to create a new Selector.", e);
        return;
    }

    // Register all channels to the new Selector.
    int nChannels = 0;
    //遍历注册到选择器的选择key集合
    for (SelectionKey key: oldSelector.keys()) {
        Object a = key.attachment();
        try {
            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
	       //如果选择key无效或选择关联的通道已经注册到新的选择器,则跳出当前循环
                continue;
            }
            //获取key的选择关注事件集
            int interestOps = key.interestOps();
            key.cancel();//取消选择key
	    //注册选择key到新的选择器
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
            if (a instanceof AbstractNioChannel) {
                // Update SelectionKey
		//如果是nio通道,则更新通道的选择key
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
            nChannels ++;
        } catch (Exception e) {
            logger.warn("Failed to re-register a Channel to the new Selector.", e);
            if (a instanceof AbstractNioChannel) {
	       //出现异常,则关闭通道
                AbstractNioChannel ch = (AbstractNioChannel) a;
                ch.unsafe().close(ch.unsafe().voidPromise());
            } else {
                @SuppressWarnings("unchecked")
		//否则反注册通道
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, key, e);
            }
        }
    }
   //更新当前事件循环选择器
    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;

    try {
        // time to close the old selector as everything else is registered to the new one
	//关闭原始选择器
        oldSelector.close();
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("Failed to close the old Selector.", t);
        }
    }

    logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}


从上面可以看出,选择方法,首先重置选择操作计数器为0,计算选择操作延时时间;
如果延时时间已过去0.5毫秒,且选择操作计数器当前为0,即第一次执行选择操作,
执行立刻执行选择操作,更新选择操作计数器,并跳出当前选择操作过程;如果任务队列中有任务,且wakeUp属性为false,并更新为true成功,则立刻执行选择操作,更新选择操作计数器,跳出当前选择操作过程;如果上面两种情况都不是,则执行超时选择操作,如果有选择key就绪,或原始wakeUp属性为true,或当前wakeUp属性为true,或任务队列有任务,或调度任务队列有调度任务,则跳出当前选择操作;如果选择器重构阈值大于0,且当前选择操作计数器的值大于阈值,则重新构造选择器,即创建新的选择器将原始选择器关联的选择key,注册到新的选择器中。

我们在简单看一下,重构选择器异常处理,我们来看通道非nio通道的情况的处理:
//否则反注册通道
 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
 invokeChannelUnregistered(task, key, e);


private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
    try {
        task.channelUnregistered(k.channel(), cause);
    } catch (Exception e) {
        logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
    }
}

//NioTask
package io.netty.channel.nio;

import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;

/**
 * An arbitrary task that can be executed by {@link NioEventLoop} when a {@link SelectableChannel} becomes ready.
 *
 * @see NioEventLoop#register(SelectableChannel, int, NioTask)
 */
public interface NioTask<C extends SelectableChannel> {
    /**
     * Invoked when the {@link SelectableChannel} has been selected by the {@link Selector}.
     当选择通道被选择器选中时,调用
     */
    void channelReady(C ch, SelectionKey key) throws Exception;

    /**
     * Invoked when the {@link SelectionKey} of the specified {@link SelectableChannel} has been cancelled and thus
     * this {@link NioTask} will not be notified anymore.
     *当通道的选择key取消时,调用,不会通知任何线程
     * @param cause the cause of the unregistration. {@code null} if a user called {@link SelectionKey#cancel()} or
     *              the event loop has been shut down.
     */
    void channelUnregistered(C ch, Throwable cause) throws Exception;
}

再来看第二点
2.
//选择器中就绪的选择key集合
processSelectedKeys();


private void processSelectedKeys() {
    if (selectedKeys != null) {
        //如果选择key集合不为空,则使用优化方式处理选择器就绪key集合
        processSelectedKeysOptimized();
    } else {
       //默认处理就绪选择key的方式
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

由于nio事件循环默认是不优化选择器的选择key集合,所以先来看一下默认处理方式:
//默认处理就绪选择key的方式
processSelectedKeysPlain(selector.selectedKeys());

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    // check if the set is empty and if so just return to not create garbage by
    // creating a new Iterator every time even if there is nothing to process.
    // See https://github.com/netty/netty/issues/597
    //检查就绪选择key集合是否为空,如果为空,直接返回,以便产生垃圾
    if (selectedKeys.isEmpty()) {
        return;
    }

    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        //就绪选择key
        final SelectionKey k = i.next();
	//获取选择key附加物
        final Object a = k.attachment();
	//移除选择key
        i.remove();
        if (a instanceof AbstractNioChannel) {
	    //如果是nio通道,则委托给processSelectedKey
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
	    //否则委托给Nio任务
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
       //就绪选择key集合遍历完,跳出循环
        if (!i.hasNext()) {
            break;
        }
       //如果需要重新执行选择操作
        if (needsToSelectAgain) {
	    //则重新执行选择操作,重新遍历就绪选择key集合
            selectAgain();
            selectedKeys = selector.selectedKeys();

            // Create the iterator again to avoid ConcurrentModificationException
	    //重新创建选择器,要避免并发修改异常
            if (selectedKeys.isEmpty()) {
                break;
            } else {
	        //重置就绪选择key集合迭代器
                i = selectedKeys.iterator();
            }
        }
    }
}

默认处理选择key我们有以下几点要看
a.
if (a instanceof AbstractNioChannel) {
    //如果是nio通道,则委托给processSelectedKey
    processSelectedKey(k, (AbstractNioChannel) a);
} 

b.
else {
    @SuppressWarnings("unchecked")
    //否则委托给Nio任务
    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    processSelectedKey(k, task);
}

c.
//如果需要重新执行选择操作
if (needsToSelectAgain) {
    //则重新执行选择操作,重新遍历就绪选择key集合
    selectAgain();
    selectedKeys = selector.selectedKeys();

    // Create the iterator again to avoid ConcurrentModificationException
    //重新创建选择器,要避免并发修改异常
    if (selectedKeys.isEmpty()) {
        break;
    } else {
        //重置就绪选择key集合迭代器
        i = selectedKeys.iterator();
    }
}

下面分别来看这几点:
a.
if (a instanceof AbstractNioChannel) {
    //如果是nio通道,则委托给processSelectedKey
    processSelectedKey(k, (AbstractNioChannel) a);
} 


private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
     final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
     if (!k.isValid()) {//首先选择key当前必须有效
         final EventLoop eventLoop;
         try {
	     //获取选择通道事件循环
             eventLoop = ch.eventLoop();
         } catch (Throwable ignored) {
             // If the channel implementation throws an exception because there is no event loop, we ignore this
             // because we are only trying to determine if ch is registered to this event loop and thus has authority
             // to close ch.
             return;
         }
         // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
         // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
         // still healthy and should not be closed.
         // See https://github.com/netty/netty/issues/5125
         if (eventLoop != this || eventLoop == null) {
	     //如果选择通道不属于当前事件循环,则直接返回
             return;
         }
         // close the channel if the key is not valid anymore
         unsafe.close(unsafe.voidPromise());
         return;
     }

     try {
          //获取选择key的就绪事件
         int readyOps = k.readyOps();
         // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
         // the NIO JDK channel implementation may throw a NotYetConnectedException.
         if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
             // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
             // See https://github.com/netty/netty/issues/924
	     //如果事件连接操作,则完成通道连接,移除连接事件
             int ops = k.interestOps();
             ops &= ~SelectionKey.OP_CONNECT;
             k.interestOps(ops);

             unsafe.finishConnect();
         }

         // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
         if ((readyOps & SelectionKey.OP_WRITE) != 0) {
             // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
	     //如果是写事件,则刷新写请求队列,释放内存
             ch.unsafe().forceFlush();
         }

         // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
         // to a spin loop
         if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
	    //如果是读事件,则委托给通道的Unsafe
             unsafe.read();
         }
     } catch (CancelledKeyException ignored) {
         unsafe.close(unsafe.voidPromise());
     }
 }

从上面可以看出,处理就绪的选择key,首先选择key当前必须有效,判断选择key通道事件循环是否是当前循环,否则直接返回,是则判断就绪key的就绪事件是连接请求事件,写事件还是读事件,如果事件连接操作,则委托通道的Unsafe完成通道连接,并移除连接事件;如果是如果是写事件,则委托通道的Unsafe刷新写请求队列,释放内存;如果是读事件,则委托给通道的Unsafe的read方法;

这里我们先来理一下,Nio事件循环是单线程事件循环,即单线程事件执行器,在处理选择器的就绪选择key时,当且仅当,就绪选择key关联通道所在的事件循环为当前事件循环时,才出来就绪选择key关联通道的就绪IO事件,从而保证通道的读写等操作线程安全。
b.
else {
    @SuppressWarnings("unchecked")
    //否则委托给Nio任务
    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    processSelectedKey(k, task);
}

//非Nio通道处理方式
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
        int state = 0;
        try {
	    //委托给nio任务的channelReady
            task.channelReady(k.channel(), k);
            state = 1;
        } catch (Exception e) {
            k.cancel();
            invokeChannelUnregistered(task, k, e);
            state = 2;
        } finally {
            switch (state) {
            case 0:
                k.cancel();
                invokeChannelUnregistered(task, k, null);
                break;
            case 1:
                if (!k.isValid()) { // Cancelled by channelReady()
                    invokeChannelUnregistered(task, k, null);
                }
                break;
            }
        }
    }

c.
//如果需要重新执行选择操作
if (needsToSelectAgain) {
    //则重新执行选择操作,重新遍历就绪选择key集合
    selectAgain();
    selectedKeys = selector.selectedKeys();

    // Create the iterator again to avoid ConcurrentModificationException
    //重新创建选择器,要避免并发修改异常
    if (selectedKeys.isEmpty()) {
        break;
    } else {
        //重置就绪选择key集合迭代器
        i = selectedKeys.iterator();
    }
}

这一点我只需要关注重新选择操作:
//很简单,不必多说
private void selectAgain() {
    needsToSelectAgain = false;
    try {
        selector.selectNow();
    } catch (Throwable t) {
        logger.warn("Failed to update SelectionKeys.", t);
    }
}


从上面可以看出:默认的处理选择器就绪选择key集合过程,为遍历选择key集合,
处理就绪的选择key,首先选择key当前必须有效,再判断选择key通道事件循环是否是
当前循环,否则直接返回,是则判断就绪key的就绪事件是连接请求事件,写事件还是读事件,如果事件连接操作,则委托通道的Unsafe完成通道连接,并移除连接事件;
如果是如果是写事件,则委托通道的Unsafe刷新写请求队列,释放内存;
如果是读事件,则委托给通道的Unsafe的read方法;如果在处理就绪选择key的过程,需要重新执行选择操作,则立刻执行,并更新当前就绪选择key集合及其迭代器。

Nio事件循环是单线程事件循环,即单线程事件执行器,在处理选择器的就绪选择key时,
当且仅当,就绪选择key关联通道所在的事件循环为当前事件循环时,才出来就绪选择key关联通道的就绪IO事件,从而保证通道的读写等操作线程安全。

再来看优化方式:
//如果选择key集合不为空,则使用优化方式处理选择器就绪key集合
processSelectedKeysOptimized();

//优化处理方式
private void processSelectedKeysOptimized() {
    //遍历就绪选择key集合
    for (int i = 0; i < selectedKeys.size; ++i) {
        //大部分相同,我们来看不同
        final SelectionKey k = selectedKeys.keys[i];
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
	//取得就绪选择key后,置空就绪选择key集合中对应的索引位置,以便gc回收关闭的通道
        selectedKeys.keys[i] = null;
        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
	    //置空就绪选择key所有元素
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}


//SelectedSelectionKeySet
void reset(int start) {
    Arrays.fill(keys, start, size, null);
    size = 0;
}



3.
//如果有异常则处理事件循环异常
handleLoopException(t);


//处理事件循环异常
private static void handleLoopException(Throwable t) {
    logger.warn("Unexpected exception in the selector loop.", t);

    // Prevent possible consecutive immediate failures that lead to
    // excessive CPU consumption.
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        // Ignore.
    }
}


4.
//事件循环正在关闭,则反注册选择器中的选择通道,并关闭。
if (isShuttingDown()) {
    closeAll();
	//确定事件循环关闭,这个方法在单线程事件执行器中定义
    if (confirmShutdown()) {
        return;
    }
}

这一点我们来看closeAll方法:
private void closeAll() {
    //重新执行选择操作
    selectAgain();
    //获取注册到选择器的选择key
    Set<SelectionKey> keys = selector.keys();
    Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
    for (SelectionKey k: keys) {
        Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            channels.add((AbstractNioChannel) a);
        } else {
	   //非nio通道,委托给nio任务
            k.cancel();
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            invokeChannelUnregistered(task, k, null);
        }
    }
    //遍历注册到选择器的通道集
    for (AbstractNioChannel ch: channels) {
       //关闭通道
        ch.unsafe().close(ch.unsafe().voidPromise());
    }
}

从上面可以看出,在事件循环的每次处理过程中,在最后都要检查意见事件循环是否关闭,如果正在关闭,则关闭注册到选择器的所有通道,并确保事件循环关闭。

在nio事件循环的其他方法:
//取消选择key
 void cancel(SelectionKey key) {
    key.cancel();
    cancelledKeys ++;
    if (cancelledKeys >= CLEANUP_INTERVAL) {
        //如果取消的选择key大于清理间隔阈值,则重置取消key计数器,并重新执行选择操作属性为true
        cancelledKeys = 0;
        needsToSelectAgain = true;
    }
}

//poll任务
@Override
protected Runnable pollTask() {
    Runnable task = super.pollTask();
    if (needsToSelectAgain) {
        selectAgain();
    }
    return task;
}
//关闭选择器
@Override
protected void cleanup() {
    try {
        selector.close();
    } catch (IOException e) {
        logger.warn("Failed to close a selector.", e);
    }
}
//获取事件循环任务队列中的任务数量
@Override
public int pendingTasks() {
    // As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
    // otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
    // See https://github.com/netty/netty/issues/5297
    if (inEventLoop()) {
       //线程在当前事件循环,则委托给父类
        return super.pendingTasks();
    } else {
        //否则委托给pendingTasksCallable任务
        return submit(pendingTasksCallable).syncUninterruptibly().getNow();
    }
}
//注册通道到选择器
 /**
  * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
  * of this event loop.  Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
  * be executed by this event loop when the {@link SelectableChannel} is ready.
  */
 public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
     if (ch == null) {
         throw new NullPointerException("ch");
     }
     if (interestOps == 0) {
         throw new IllegalArgumentException("interestOps must be non-zero.");
     }
     //关系兴趣事件,必须为通道支持
     if ((interestOps & ~ch.validOps()) != 0) {
         throw new IllegalArgumentException(
                 "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
     }
     if (task == null) {
         throw new NullPointerException("task");
     }

     if (isShutdown()) {
         throw new IllegalStateException("event loop shut down");
     }

     try {
         //委托给通道
         ch.register(selector, interestOps, task);
     } catch (Exception e) {
         throw new EventLoopException("failed to register a channel", e);
     }
 }
//Io事件在事件循环中的处理时间百分比
/**
 * Returns the percentage of the desired amount of time spent for I/O in the event loop.
 */
public int getIoRatio() {
    return ioRatio;
}

/**
 * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
 * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
 */
public void setIoRatio(int ioRatio) {
    if (ioRatio <= 0 || ioRatio > 100) {
        throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
    }
    this.ioRatio = ioRatio;
}
//创建任务队列
 @Override
 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
     // This event loop never calls takeTask()
     //这个现在先不说,以后有机会单独讲
     return PlatformDependent.newMpscQueue(maxPendingTasks);
 }

总结:

    Nio事件循环启动后,首先选择策略根据选择器结果提供者和任务队列是否有任务生成下一步的操作策略,如果选择操作结果返回为SelectStrategy.CONTINUE,则跳出当前事件循环,如果为SelectStrategy.SELECT,则执行选择操作,并阻塞下一次选择操作,如果需要唤醒选择器,则唤醒;然后重置取消选择key计数器cancelledKeys为0,置是否需要重新选择属性needsToSelectAgain为false,然后处理选择器就绪的选择key,在根据当前IO事件处理时间百分比ioRatio,决定是执行任务队列所有任务还是超时执行任务队列任务,如果ioRatio小于100,则为超时执行任务队列中任务;最后检查事件循环是否正在关闭,是则反注册选择器中的选择通道,并关闭,确定事件循环关闭;上述整个过程在事件循环运行期间,不断地重复。在事件循环的每次处理过程中,在最后都要检查意见事件循环是否关闭,如果正在关闭,则关闭注册到选择器的所有通道,并确保事件循环关闭。
    选择操作的过程为,首先重置选择操作计数器为0,计算选择操作延时时间;如果延时时间已过去0.5毫秒,且选择操作计数器当前为0,即第一次执行选择操作,执行立刻执行选择操作,更新选择操作计数器,并跳出当前选择操作过程;如果任务队列中有任务,且wakeUp属性为false,并更新为true成功,则立刻执行选择操作,更新选择操作计数器,跳出当前选择操作过程;如果上面两种情况都不是,则执行超时选择操作,如果有选择key就绪,或原始wakeUp属性为true,或当前wakeUp属性为true,或任务队列有任务,或调度任务队列有调度任务,则跳出当前选择操作;如果选择器重构阈值大于0,且当前选择操作计数器的值大于阈值,则重新构造选择器,即创建新的选择器将原始选择器关联的选择key,注册到新的选择器中。
    默认的处理选择器就绪选择key集合过程,为遍历选择key集合,处理就绪的选择key,首先选择key当前必须有效,再判断选择key通道事件循环是否是当前循环,否则直接返回,是则判断就绪key的就绪事件是连接请求事件,写事件还是读事件,如果事件连接操作,则委托通道的Unsafe完成通道连接,并移除连接事件;如果是如果是写事件,则委托通道的Unsafe刷新写请求队列,释放内存;果是读事件,则委托给通道的Unsafe的read方法;如果在处理就绪选择key的过程,需要重新执行选择操作,则立刻执行,并更新当前就绪选择key集合及其迭代器。
    Nio事件循环内部有一个选择器,所有注册到选择器的通道都在一个事件循环中,Nio事件循环是单线程事件循环,即单线程事件执行器,在处理选择器的就绪选择key时,当且仅当,就绪选择key关联通道所在的事件循环为当前事件循环时,才出来就绪选择key关联通道的就绪IO事件,从而保证通道的读写等操作线程安全。
    Nio事件循环实际的工作就是执行选择操作,并处理选择器的就绪选择key,Nio事件循环与Mina的IoProcessor有点相似,都可以看着一个线程执行器,执行通道的IO事件操作,而不同的是Nio管理的是选择器Selector,而Mina的IoProcessor管理的会话IoSession。

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics