`
Donald_Draper
  • 浏览: 949342 次
社区版块
存档分类
最新评论

netty 多线程事件循环组

阅读更多
netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
引言:
上一篇文章我们看了多线程事件执行器组,先来回顾一下:
     多线程事件执行器组MultithreadEventExecutorGroup,内部有一个事件执行器数组存放组内的事件执行器;readonlyChildren为组内事件执行器集的可读包装集Set;terminatedChildren(AtomicInteger),用于记录已关闭的事件执行器数;termination为执行器组terminated异步任务结果;同时有一个事件执行器选择器chooser(EventExecutorChooser)。构造多线程执行器组,首先检查线程数参数,如果执行器不为空,则初始化线程执行器的线程工厂,创建事件执行器集,并根据执行器和相关参数创建事件执行器,实际创建方法为newChild,待子类实现,初始化事件执行器选择器,创建terminated事件执行器监听器,添加terminated事件执行器监听器到terminated异步任务结果,包装事件执行器集为只读集readonlyChildren。
     获取执行器组的下一个事件执行器方法委托个内存的事件执行器选择器chooser;返回的迭代器为内部只读执行器集的迭代器;而关闭执行器组方法,实际为遍历管理的事件执行器集,关闭执行器;判断执行器组是否关闭和Terminated,当且仅当组内的事件执行器都关闭和Terminated时,才返回true;超时等待Terminated执行器组方法,实际为遍历事件执行器组超时等待时间耗完,则停止Terminated执行器组,否则,超时剩余等待时间timeLeft,Terminated事件执行器。
本想这篇看一下多线程事件循环组呢,但是其实现了事件循环组,我们这篇先来看一下EventLoopGroup
//MultithreadEventLoopGroup
package io.netty.channel;

import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorChooserFactory;
import io.netty.util.concurrent.MultithreadEventExecutorGroup;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;

/**
 * Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at
 * the same time.
 */
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {


下面来看事件循环组EventLoopGroup的定义
package io.netty.channel;

import io.netty.util.concurrent.EventExecutorGroup;

/**
 * Special {@link EventExecutorGroup} which allows registering {@link Channel}s that get
 * processed for later selection during the event loop.
 *
 */
public interface EventLoopGroup extends EventExecutorGroup {
    /**
     * Return the next {@link EventLoop} to use
     返回下一个事件循环
     */
    @Override
    EventLoop next();

    /**
     * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
     * will get notified once the registration was complete.
     注册通道到事件循环,返回异步通道注册结果,当注册完成通知结果。
     */
    ChannelFuture register(Channel channel);

    /**
     * Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed
     * {@link ChannelFuture} will get notified once the registration was complete and also will get returned.
     注册可写异步通道任务结果关联的通道
     */
    ChannelFuture register(ChannelPromise promise);

    /**
     * Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
     * will get notified once the registration was complete and also will get returned.
     *注册通道到事件循环,当注册完成,通知异步通道注册结果。
     * @deprecated Use {@link #register(ChannelPromise)} instead.
     */
    @Deprecated
    ChannelFuture register(Channel channel, ChannelPromise promise);
}

从上面可以看出,事件循环组EventLoopGroup继承了事件执行器组EventExecutorGroup,next方法返回的为事件循环EventLoop,
事件循环组主要所做的工作为通道注册。

再来看事件循环EventLoop接口的定义:
//EventLoop
package io.netty.channel;

import io.netty.util.concurrent.OrderedEventExecutor;

/**
 * Will handle all the I/O operations for a {@link Channel} once registered.
 *
 * One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on
 * implementation details and internals.
 一个事件循环实例可以处理多个通道,这个具体要依赖于具体的实现。
 *
 */
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
   //获取事件循环所属的事件循环组
    @Override
    EventLoopGroup parent();
}
//OrderedEventExecutor
package io.netty.util.concurrent;

/**
 * Marker interface for {@link EventExecutor}s that will process all submitted tasks in an ordered / serial fashion.
 标记一个事件执行器顺序、串行的方式处理提交的任务
 */
public interface OrderedEventExecutor extends EventExecutor {
}

从上面可以看出事件循环EventLoop可理解为已顺序、串行的方式处理提交的任务的事件执行器EventExecutor。事件循环组EventLoopGroup可以理解为特殊的事件执行器组EventExecutorGroup;事件执行器组管理事件执行器,事件循环组管理事件循环。

再来看事件循环抽象实现:

package io.netty.channel;

import io.netty.util.concurrent.AbstractEventExecutor;

/**
 * Skeletal implementation of {@link EventLoop}.
 */
public abstract class AbstractEventLoop extends AbstractEventExecutor implements EventLoop {
    protected AbstractEventLoop() { }
    protected AbstractEventLoop(EventLoopGroup parent) {
        super(parent);
    }
    @Override
    public EventLoopGroup parent() {
        return (EventLoopGroup) super.parent();
    }
    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
}

抽象事件循环AbstractEventLoop继承了抽象事件执行器,实现了事件循环接口。

鉴于当这样已经把事件循环和事件循环组看完,那就来看下多线程事件循环组:
package io.netty.channel;

import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorChooserFactory;
import io.netty.util.concurrent.MultithreadEventExecutorGroup;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;

/**
 * Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at
 * the same time.
 */
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);

    private static final int DEFAULT_EVENT_LOOP_THREADS;//默认事件循环线程数

    static {
        //默认事件循环线程数为1和可用处理器数的2倍中的最大者
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    //下面的构造函数都是具体可以参考多线程事件执行器组的相应的构造
    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }
    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor,
     * EventExecutorChooserFactory, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                     Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
    }

    @Override
    protected ThreadFactory newDefaultThreadFactory() {
        //创建的默认线程工厂的线程优先级默认为最大优先级
        return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
    }

    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
    //创建事务循环,待子类实现
    @Override
    protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public ChannelFuture register(ChannelPromise promise) {
        return next().register(promise);
    }

    @Deprecated
    @Override
    public ChannelFuture register(Channel channel, ChannelPromise promise) {
        return next().register(channel, promise);
    }
}

从上面可以看出,多线程事件循环组MultithreadEventLoopGroup继承了多线程事件执行器组,实现了事件循环组接口,
相关注册通道方法委托给多线程事件循环组的next事件循环,线程工程创建的线程优先级默认为最大线程优先级;
默认事件循环线程数为1和可用处理器数的2倍中的最大者,这个线程数就是构造多线程事件执行器组事件执行器数量。


//NioEventLoopGroup
/**
 * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s.
 */
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    ...
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
}

从Nio事件循环组创建事件循环可以看出事件循环为NioEventLoop,这也就是接下来的文章要看的,先列出
Nio事件循环声明继承树。
/**
 * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
 * {@link Selector} and so does the multi-plexing of these in the event loop.
 *
 */
public final class NioEventLoop extends SingleThreadEventLoop {

/**
 * Abstract base class for {@link EventLoop}s that execute all its submitted tasks in a single thread.
 *
 */
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

/**
 * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
 *
 */
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {


总结:

     事件循环组EventLoopGroup继承了事件执行器组EventExecutorGroup,next方法返回的为事件循环EventLoop,事件循环组主要所做的工作为通道注册。
     事件循环EventLoop可理解为已顺序、串行的方式处理提交的任务的事件执行器EventExecutor。事件循环组EventLoopGroup可以理解为特殊的事件执行器组EventExecutorGroup;事件执行器组管理事件执行器,事件循环组管理事件循环。抽象事件循环AbstractEventLoop继承了抽象事件执行器AbstractEventExecutor,实现了事件循环接口。
     多线程事件循环组MultithreadEventLoopGroup继承了多线程事件执行器组,实现了事件循环组接口,相关注册通道方法委托给多线程事件循环组的next事件循环,线程工程创建的线程优先级默认为最大线程优先级;默认事件循环线程数为1和可用处理器数的2倍中的最大者,这个线程数就是构造多线程事件执行器组事件执行器数量。

附:
在多线程事件循环组的静态语句中,初始化默认事件循环线程数有下面一段:
private static final int DEFAULT_EVENT_LOOP_THREADS;//默认事件循环线程数
static {
    //默认事件循环线程数为1和可用处理器数的2倍中的最大者
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

我们来看一下NettyRuntime

package io.netty.util;

import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.SystemPropertyUtil;

import java.util.Locale;

/**
 * A utility class for wrapping calls to {@link Runtime}.
 运行时包装类
 */
public final class NettyRuntime {
    //可利用处理器holder
    private static final AvailableProcessorsHolder holder = new AvailableProcessorsHolder();
    /**
     * Holder class for available processors to enable testing.
     */
    static class AvailableProcessorsHolder {
        private int availableProcessors;//可利用的处理器数量

        /**
         * Set the number of available processors.
         *设置可利用的处理器数量
         * @param availableProcessors the number of available processors
         * @throws IllegalArgumentException if the specified number of available processors is non-positive
         * @throws IllegalStateException    if the number of available processors is already configured
         */
        synchronized void setAvailableProcessors(final int availableProcessors) {
            ObjectUtil.checkPositive(availableProcessors, "availableProcessors");
            if (this.availableProcessors != 0) {
                final String message = String.format(
                        Locale.ROOT,
                        "availableProcessors is already set to [%d], rejecting [%d]",
                        this.availableProcessors,
                        availableProcessors);
                throw new IllegalStateException(message);
            }
            this.availableProcessors = availableProcessors;
        }

        /**
         * Get the configured number of available processors. The default is {@link Runtime#availableProcessors()}.
         * This can be overridden by setting the system property "io.netty.availableProcessors" or by invoking
         * {@link #setAvailableProcessors(int)} before any calls to this method.
         *
         * @return the configured number of available processors
         */
        @SuppressForbidden(reason = "to obtain default number of available processors")
        synchronized int availableProcessors() {
            if (this.availableProcessors == 0) {
	        //获取可以用的系统可用的处理器数量
                final int availableProcessors =
                        SystemPropertyUtil.getInt(
                                "io.netty.availableProcessors",
                                Runtime.getRuntime().availableProcessors());
                setAvailableProcessors(availableProcessors);
            }
            return this.availableProcessors;
        }
    }

    /**
     * Set the number of available processors.
     *设置可利用的处理器数量
     * @param availableProcessors the number of available processors
     * @throws IllegalArgumentException if the specified number of available processors is non-positive
     * @throws IllegalStateException    if the number of available processors is already configured
     */
    @SuppressWarnings("unused,WeakerAccess") // this method is part of the public API
    public static void setAvailableProcessors(final int availableProcessors) {
        holder.setAvailableProcessors(availableProcessors);
    }

    /**
     * Get the configured number of available processors. The default is {@link Runtime#availableProcessors()}. This
     * can be overridden by setting the system property "io.netty.availableProcessors" or by invoking
     * {@link #setAvailableProcessors(int)} before any calls to this method.
     *获取配置的可用处理器数量,默认的为Runtime#availableProcessors()。在调用此方法前,
     这个值可以被设置io.netty.availableProcessors属性或#setAvailableProcessors(int)重写。
     * @return the configured number of available processors
     */
    public static int availableProcessors() {
        return holder.availableProcessors();
    }

    /**
     * No public constructor to prevent instances from being created.
     */
    private NettyRuntime() {
    }
}

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics