- 浏览: 950616 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
引言:
前面一篇文章我们看了事件执行器组和事件执行器的接口定义,先来回顾一下:
事件循环组EventLoopGroup为一个特殊的事件执行器组EventExecutorGroup,可以注册通道,以便在事件循环中,被后面的选择操作处理器。事件执行器组继承了JUC的调度执行器服务ScheduledExecutorService,用迭代器Iterable<EventExecutor>管理组内的事件执行器。事件执行器是一个特殊的事件执行器组。Nio多线程事件循环NioEventLoopGroup可以理解为多线程版MultithreadEventExecutorGroup的事件执行器组。
事件执行器组EventExecutorGroup主要提供了关闭事件执行器组管理的执行器的相关方法,获取事件执行器组管理的事件执行器和执行任务线程方法。
事件执行器EventExecutor为一个特殊的事件执行器组EventExecutorGroup,提供了获取事件执行器组的下一个事件执行器方法,判断线程是否在当前事件循环中以及创建可写的异步任务结果和进度结果,及已经成功失败的异步结果。
抽象事件执行器组AbstractEventExecutorGroup,所有与调度执行器关联的提交任务和调度任务方法,直接委托给事件执行器组的下一个事件执行器相应方法执行。graceful方式关闭事件执行器组,默认关闭间隔为2s,超时时间为25s,具体定义在抽象事件执行器AbstractEventExecutor中。
抽象事件执行器,继承了抽象执行器服务AbstractExecutorService,提交任务线程,直接委托给父类抽象执行器服务,不支持延时调度的周期间歇性调度任务线程,多个一个安全地执行给定任务线程方法,捕捉执行过程中抛出的异常。由于抽象的事件执行器是一个特殊的事件执行器组,内部事件执行器selfCollection(Collections.<EventExecutor>singleton(this)),是自己单例集,next方法返回的是自己。
我们先把Nio事件循环继承树结构列一下:
事件循环组EventLoopGroup和事件执行器组EventExecutorGroup及事件执行器EventExecutor的关系;
调度执行器ScheduledExecutorService为JUC包中的执行器服务,用迭代器Iterable<EventExecutor>管理组内的
事件执行器。
再来看事件循环组的另一个分支EventLoopGroup
今天我们来看一下多线程事件执行器组:
从上面来看,多线程事件执行器组MultithreadEventExecutorGroup,内部有一个事件执行器数组存放组内的事件执行器;readonlyChildren为组内事件执行器集的可读包装集Set;terminatedChildren(AtomicInteger),用于记录已关闭的事件执行器数;termination为执行器组terminated异步任务结果;同时有一个事件执行器选择器chooser(EventExecutorChooser)。
我们来看一下事件执行器选择器的定义:
其定义在选择器工厂EventExecutorChooserFactory中
在选择器工厂定义中有一个注解我们来看一下:
回到事件执行器选择工厂的默认实现:
从上面可以看出当,当执行器的数量为2的幂次方时,使用的事件执行器选择器为PowerOfTwoEventExecutorChooser,
否则为GenericEventExecutorChooser。
回到多线程事件执行器组的构造:
从上面的构造函数来看,事件执行器选择器默认为DefaultEventExecutorChooserFactory,然后委托给
再看另外一个种构造:
来看任务线程执行器ThreadPerTaskExecutor
从上面来看默认的线程池执行器为ThreadPerTaskExecutor。
来看实际构造
构造方法中有一个点我们需要关注,创建默认线程池工厂:
默认线程工厂类,我们需要关注的只有以下一点:
//InternalThreadLocalMap,看看即,后具体涉及再看
从上面可以看出,构造多线程执行器组,首先检查线程数参数,如果执行器不为空,则初始化线程执行器的线程工厂,创建事件执行器集,并根据执行器和相关参数创建事件执行器,实际创建方法为newChild,待子类实现,初始化事件执行器选择器,创建terminated事件执行器监听器,添加terminated事件执行器监听器到terminated异步任务结果,包装事件执行器集为只读集readonlyChildren。
回到多线程执行器组的关闭,超时等待方Terminated法:
从上面可以看出获取执行器组的下一个事件执行器方法委托个内存的事件执行器选择器chooser;返回的迭代器为内部只读执行器集的迭代器;而关闭执行器组方法,实际为遍历管理的事件执行器集,关闭执行器;判断执行器组是否关闭和Terminated,当且仅当组内的事件执行器都关闭和Terminated时,才返回true;超时等待Terminated执行器组方法,实际为遍历事件执行器组超时等待时间耗完,则停止Terminated执行器组,否则,超时剩余等待时间timeLeft,Terminated事件执行器。
总结:
多线程事件执行器组MultithreadEventExecutorGroup,内部有一个事件执行器数组存放组内的事件执行器;readonlyChildren为组内事件执行器集的可读包装集Set;terminatedChildren(AtomicInteger),用于记录已关闭的事件执行器数;termination为执行器组terminated异步任务结果;同时有一个事件执行器选择器chooser(EventExecutorChooser)。构造多线程执行器组,首先检查线程数参数,如果执行器不为空,则初始化线程执行器的线程工厂,创建事件执行器集,并根据执行器和相关参数创建事件执行器,实际创建方法为newChild,待子类实现,初始化事件执行器选择器,创建terminated事件执行器监听器,添加terminated事件执行器监听器到terminated异步任务结果,包装事件执行器集为只读集readonlyChildren。
获取执行器组的下一个事件执行器方法委托个内存的事件执行器选择器chooser;返回的迭代器为内部只读执行器集的迭代器;而关闭执行器组方法,实际为遍历管理的事件执行器集,关闭执行器;判断执行器组是否关闭和Terminated,当且仅当组内的事件执行器都关闭和Terminated时,才返回true;超时等待Terminated执行器组方法,实际为遍历事件执行器组超时等待时间耗完,则停止Terminated执行器组,否则,超时剩余等待时间timeLeft,Terminated事件执行器。
引言:
前面一篇文章我们看了事件执行器组和事件执行器的接口定义,先来回顾一下:
事件循环组EventLoopGroup为一个特殊的事件执行器组EventExecutorGroup,可以注册通道,以便在事件循环中,被后面的选择操作处理器。事件执行器组继承了JUC的调度执行器服务ScheduledExecutorService,用迭代器Iterable<EventExecutor>管理组内的事件执行器。事件执行器是一个特殊的事件执行器组。Nio多线程事件循环NioEventLoopGroup可以理解为多线程版MultithreadEventExecutorGroup的事件执行器组。
事件执行器组EventExecutorGroup主要提供了关闭事件执行器组管理的执行器的相关方法,获取事件执行器组管理的事件执行器和执行任务线程方法。
事件执行器EventExecutor为一个特殊的事件执行器组EventExecutorGroup,提供了获取事件执行器组的下一个事件执行器方法,判断线程是否在当前事件循环中以及创建可写的异步任务结果和进度结果,及已经成功失败的异步结果。
抽象事件执行器组AbstractEventExecutorGroup,所有与调度执行器关联的提交任务和调度任务方法,直接委托给事件执行器组的下一个事件执行器相应方法执行。graceful方式关闭事件执行器组,默认关闭间隔为2s,超时时间为25s,具体定义在抽象事件执行器AbstractEventExecutor中。
抽象事件执行器,继承了抽象执行器服务AbstractExecutorService,提交任务线程,直接委托给父类抽象执行器服务,不支持延时调度的周期间歇性调度任务线程,多个一个安全地执行给定任务线程方法,捕捉执行过程中抛出的异常。由于抽象的事件执行器是一个特殊的事件执行器组,内部事件执行器selfCollection(Collections.<EventExecutor>singleton(this)),是自己单例集,next方法返回的是自己。
我们先把Nio事件循环继承树结构列一下:
事件循环组EventLoopGroup和事件执行器组EventExecutorGroup及事件执行器EventExecutor的关系;
/** * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s. Nio事件循环组NioEventLoopGroup为多线程事件循环组的事件,主要用于基于通道的Nio选择器相关操作。 */ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
/** * Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at * the same time. 多线程事件循环组MultithreadEventLoopGroup为事件循环组的实现,可以在同一时间多线程处理任务。 */ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
/** * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at * the same time. 多线程事件执行器组MultithreadEventExecutorGroup可以在同一时间多线程处理任务。 */ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
/** * Abstract base class for {@link EventExecutorGroup} implementations. 事件循环中的抽象实现 */ public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
/** * The {@link EventExecutorGroup} is responsible for providing the {@link EventExecutor}'s to use * via its {@link #next()} method. Besides this, it is also responsible for handling their * life-cycle and allows shutting them down in a global fashion. 事件执行器组通道next方法提供事件执行器。除此之外,负责他们的生命循环,并允许以全局的方式关闭 * */ public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
调度执行器ScheduledExecutorService为JUC包中的执行器服务,用迭代器Iterable<EventExecutor>管理组内的
事件执行器。
/** * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes * with some handy methods to see if a {@link Thread} is executed in a event loop. * Besides this, it also extends the {@link EventExecutorGroup} to allow for a generic * way to access methods. 事件执行器EventExecutor是一个特殊的事件执行器组,如果线程在事件循环中执行,事件执行器可以处理 相关的操作。除此之外,拓展了事件执行器组的相关方法,可以用一般的方式访问事件执行器组的相关方法。 * */ public interface EventExecutor extends EventExecutorGroup {
再来看事件循环组的另一个分支EventLoopGroup
/** * Special {@link EventExecutorGroup} which allows registering {@link Channel}s that get * processed for later selection during the event loop. 事件循环组为一个特殊的事件执行器组,可以注册通道,以便在事件循环中,被后面的选择操作处理器。 * */ public interface EventLoopGroup extends EventExecutorGroup {
今天我们来看一下多线程事件执行器组:
/** * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at * the same time. 多线程事件执行器组MultithreadEventExecutorGroup可以在同一时间多线程处理任务。 */ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { private final EventExecutor[] children;//组内的事件执行器 private final Set<EventExecutor> readonlyChildren;//组内事件执行器集的可读包装集 private final AtomicInteger terminatedChildren = new AtomicInteger();//已关闭的事件执行器数 //termination异步任务结果 private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); private final EventExecutorChooserFactory.EventExecutorChooser chooser;//事件执行器选择器 }
从上面来看,多线程事件执行器组MultithreadEventExecutorGroup,内部有一个事件执行器数组存放组内的事件执行器;readonlyChildren为组内事件执行器集的可读包装集Set;terminatedChildren(AtomicInteger),用于记录已关闭的事件执行器数;termination为执行器组terminated异步任务结果;同时有一个事件执行器选择器chooser(EventExecutorChooser)。
我们来看一下事件执行器选择器的定义:
其定义在选择器工厂EventExecutorChooserFactory中
package io.netty.util.concurrent; import io.netty.util.internal.UnstableApi; /** * Factory that creates new {@link EventExecutorChooser}s. */ @UnstableApi public interface EventExecutorChooserFactory { /** * Returns a new {@link EventExecutorChooser}. 创建一个事件执行器选择器 */ EventExecutorChooser newChooser(EventExecutor[] executors); /** * Chooses the next {@link EventExecutor} to use. */ @UnstableApi interface EventExecutorChooser { /** * Returns the new {@link EventExecutor} to use. 返回新创建的事件执行功能器 */ EventExecutor next(); } }
在选择器工厂定义中有一个注解我们来看一下:
package io.netty.util.internal; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * Indicates a public API that can change at any time (even in minor/bugfix releases). *预示着公用的API可能随时改变 * Usage guidelines: *使用引导 * [list=1] * [*]Is not needed for things located in *.internal.* packages * [*]Only public accessible classes/interfaces must be annotated 公有方法方式必须注解 * <li>If this annotation is not present the API is considered stable and so no backward compatibility can be * broken in a non-major release!</li> 此注解不表示API不稳定,而是在一个非主线释放版本中,不向后兼容。 * [/list] */ @Retention(RetentionPolicy.SOURCE) @Target({ ElementType.ANNOTATION_TYPE, ElementType.CONSTRUCTOR, ElementType.FIELD, ElementType.METHOD, ElementType.PACKAGE, ElementType.TYPE }) @Documented public @interface UnstableApi { }
回到事件执行器选择工厂的默认实现:
package io.netty.util.concurrent; import io.netty.util.internal.UnstableApi; import java.util.concurrent.atomic.AtomicInteger; /** * Default implementation which uses simple round-robin to choose next {@link EventExecutor}. */ @UnstableApi public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { //默认事件执行器选择器工程实例 public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private DefaultEventExecutorChooserFactory() { } @SuppressWarnings("unchecked") @Override //根据执行器的数量是否为2的幂次方决定使用PowerOfTwoEventExecutorChooser还是GenericEventExecutorChooser public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } //判断执行器数是否为2的幂次方 private static boolean isPowerOfTwo(int val) { return (val & -val) == val; } //2的幂次方事件执行器选择器 private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } } //一般事件执行器选择器 private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } //获取内部执行器id对应的事件执行器 @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } } }
从上面可以看出当,当执行器的数量为2的幂次方时,使用的事件执行器选择器为PowerOfTwoEventExecutorChooser,
否则为GenericEventExecutorChooser。
回到多线程事件执行器组的构造:
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. 线程数 * @param executor the Executor to use, or {@code null} if the default should be used. 执行器 * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call 传递给创建事件执行器的参数 */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
从上面的构造函数来看,事件执行器选择器默认为DefaultEventExecutorChooserFactory,然后委托给
MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args)
再看另外一个种构造:
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args); }
来看任务线程执行器ThreadPerTaskExecutor
package io.netty.util.concurrent; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }
从上面来看默认的线程池执行器为ThreadPerTaskExecutor。
来看实际构造
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param chooserFactory the {@link EventExecutorChooserFactory} to use. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //检查线程数参数 if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } //如果执行器不为空,则初始化线程执行器的线程工厂 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //创建事件执行器集 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //创建事件执行器,newChild待子类实现 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { //初始化失败,则关闭事件执行器 for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } //Terminated事件执行器 for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } //初始化事件执行器选择器 chooser = chooserFactory.newChooser(children); //创建terminated事件执行器监听器 final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; //添加terminated事件执行器监听器到terminated异步任务结果 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } //包装事件执行器集为只读 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); } }
构造方法中有一个点我们需要关注,创建默认线程池工厂:
protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(getClass()); }
package io.netty.util.concurrent; import io.netty.util.internal.StringUtil; import java.util.Locale; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** * A {@link ThreadFactory} implementation with a simple naming rule. */ public class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolId = new AtomicInteger(); private final AtomicInteger nextId = new AtomicInteger();//线程id生成器 private final String prefix;//线程名前缀 private final boolean daemon;//是否为守候模式 private final int priority;//线程有限级 protected final ThreadGroup threadGroup;//线程组 public DefaultThreadFactory(Class<?> poolType) { this(poolType, false, Thread.NORM_PRIORITY); } public DefaultThreadFactory(String poolName) { this(poolName, false, Thread.NORM_PRIORITY); } public DefaultThreadFactory(Class<?> poolType, boolean daemon) { this(poolType, daemon, Thread.NORM_PRIORITY); } public DefaultThreadFactory(String poolName, boolean daemon) { this(poolName, daemon, Thread.NORM_PRIORITY); } public DefaultThreadFactory(Class<?> poolType, int priority) { this(poolType, false, priority); } public DefaultThreadFactory(String poolName, int priority) { this(poolName, false, priority); } public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) { this(toPoolName(poolType), daemon, priority); } //获取线程池名称 public static String toPoolName(Class<?> poolType) { if (poolType == null) { throw new NullPointerException("poolType"); } String poolName = StringUtil.simpleClassName(poolType); switch (poolName.length()) { case 0: return "unknown"; case 1: return poolName.toLowerCase(Locale.US); default: if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) { return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1); } else { return poolName; } } } public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) { if (poolName == null) { throw new NullPointerException("poolName"); } if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { throw new IllegalArgumentException( "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)"); } //线程名默认为线程池名+id prefix = poolName + '-' + poolId.incrementAndGet() + '-'; this.daemon = daemon; this.priority = priority; this.threadGroup = threadGroup; } public DefaultThreadFactory(String poolName, boolean daemon, int priority) { //如果系统安全管理器为空,则线程组为当前线程所属组,否则系统安全管理获取线程组信息 this(poolName, daemon, priority, System.getSecurityManager() == null ? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup()); } @Override public Thread newThread(Runnable r) { Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon() != daemon) { t.setDaemon(daemon); } if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { // Doesn't matter even if failed to set. } return t; } //包装线程为FastThreadLocalThread protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); } //线程包装类 private static final class DefaultRunnableDecorator implements Runnable { private final Runnable r; DefaultRunnableDecorator(Runnable r) { this.r = r; } @Override public void run() { try { r.run(); } finally { //移除所有线程级的变量 FastThreadLocal.removeAll(); } } } }
默认线程工厂类,我们需要关注的只有以下一点:
//包装线程为FastThreadLocalThread protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(threadGroup, r, name); }
package io.netty.util.concurrent; import io.netty.util.internal.InternalThreadLocalMap; /** * A special {@link Thread} that provides fast access to {@link FastThreadLocal} variables. */ public class FastThreadLocalThread extends Thread { private InternalThreadLocalMap threadLocalMap;//线程本地变量Map public FastThreadLocalThread() { } public FastThreadLocalThread(Runnable target) { super(target); } public FastThreadLocalThread(ThreadGroup group, Runnable target) { super(group, target); } public FastThreadLocalThread(String name) { super(name); } public FastThreadLocalThread(ThreadGroup group, String name) { super(group, name); } public FastThreadLocalThread(Runnable target, String name) { super(target, name); } public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) { super(group, target, name); } public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) { super(group, target, name, stackSize); } /** * Returns the internal data structure that keeps the thread-local variables bound to this thread. * Note that this method is for internal use only, and thus is subject to change at any time. 返回绑定的当前线程的线程本地变量Map */ public final InternalThreadLocalMap threadLocalMap() { return threadLocalMap; } /** * Sets the internal data structure that keeps the thread-local variables bound to this thread. * Note that this method is for internal use only, and thus is subject to change at any time. */ public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) { this.threadLocalMap = threadLocalMap; } }
//InternalThreadLocalMap,看看即,后具体涉及再看
/** * The internal data structure that stores the thread-local variables for Netty and all {@link FastThreadLocal}s. * Note that this class is for internal use only and is subject to change at any time. Use {@link FastThreadLocal} * unless you know what you are doing. 线程本地变量Map */ public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY = 8; public static final Object UNSET = new Object(); ... }
从上面可以看出,构造多线程执行器组,首先检查线程数参数,如果执行器不为空,则初始化线程执行器的线程工厂,创建事件执行器集,并根据执行器和相关参数创建事件执行器,实际创建方法为newChild,待子类实现,初始化事件执行器选择器,创建terminated事件执行器监听器,添加terminated事件执行器监听器到terminated异步任务结果,包装事件执行器集为只读集readonlyChildren。
回到多线程执行器组的关闭,超时等待方Terminated法:
@Override public EventExecutor next() { //委托给事件执行器选择器 return chooser.next(); } @Override public Iterator<EventExecutor> iterator() { //获取只读执行器集的迭代器 return readonlyChildren.iterator(); } /** * Return the number of {@link EventExecutor} this implementation uses. This number is the maps * 1:1 to the threads it use. 返回管理的事件执行器数量 */ public final int executorCount() { return children.length; } /** * Create a new EventExecutor which will later then accessible via the {@link #next()} method. This method will be * called for each thread that will serve this {@link MultithreadEventExecutorGroup}. *创建一个事件执行器,创建后,可以通过next方法访问。此方法由服务多线程事件执行器组线程调用, 待子类实现 */ protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception; //关闭事件执行器组 @Override public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { //遍历管理的事件执行器集,关闭执行器 for (EventExecutor l: children) { l.shutdownGracefully(quietPeriod, timeout, unit); } //返回异步关闭事件执行器组任务结果 return terminationFuture(); } @Override public Future<?> terminationFuture() { return terminationFuture; } @Override @Deprecated public void shutdown() { for (EventExecutor l: children) { l.shutdown(); } } //所有执行器组内的事件执行器正在关闭,才返回true @Override public boolean isShuttingDown() { for (EventExecutor l: children) { if (!l.isShuttingDown()) { return false; } } return true; } //所有执行器组内的事件执行器已关闭,才返回true @Override public boolean isShutdown() { for (EventExecutor l: children) { if (!l.isShutdown()) { return false; } } return true; } //所有执行器组内的事件执行器已Terminated,才返回true @Override public boolean isTerminated() { for (EventExecutor l: children) { if (!l.isTerminated()) { return false; } } return true; } //超时等待Terminated执行器组 @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long deadline = System.nanoTime() + unit.toNanos(timeout); loop: for (EventExecutor l: children) {//遍历事件执行器组 for (;;) { long timeLeft = deadline - System.nanoTime(); if (timeLeft <= 0) {//超时等待时间耗完,则停止Terminated执行器组 break loop; } //否则,超时剩余等待时间timeLeft,Terminated事件执行器 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) { break; } } } return isTerminated(); }
从上面可以看出获取执行器组的下一个事件执行器方法委托个内存的事件执行器选择器chooser;返回的迭代器为内部只读执行器集的迭代器;而关闭执行器组方法,实际为遍历管理的事件执行器集,关闭执行器;判断执行器组是否关闭和Terminated,当且仅当组内的事件执行器都关闭和Terminated时,才返回true;超时等待Terminated执行器组方法,实际为遍历事件执行器组超时等待时间耗完,则停止Terminated执行器组,否则,超时剩余等待时间timeLeft,Terminated事件执行器。
总结:
多线程事件执行器组MultithreadEventExecutorGroup,内部有一个事件执行器数组存放组内的事件执行器;readonlyChildren为组内事件执行器集的可读包装集Set;terminatedChildren(AtomicInteger),用于记录已关闭的事件执行器数;termination为执行器组terminated异步任务结果;同时有一个事件执行器选择器chooser(EventExecutorChooser)。构造多线程执行器组,首先检查线程数参数,如果执行器不为空,则初始化线程执行器的线程工厂,创建事件执行器集,并根据执行器和相关参数创建事件执行器,实际创建方法为newChild,待子类实现,初始化事件执行器选择器,创建terminated事件执行器监听器,添加terminated事件执行器监听器到terminated异步任务结果,包装事件执行器集为只读集readonlyChildren。
获取执行器组的下一个事件执行器方法委托个内存的事件执行器选择器chooser;返回的迭代器为内部只读执行器集的迭代器;而关闭执行器组方法,实际为遍历管理的事件执行器集,关闭执行器;判断执行器组是否关闭和Terminated,当且仅当组内的事件执行器都关闭和Terminated时,才返回true;超时等待Terminated执行器组方法,实际为遍历事件执行器组超时等待时间耗完,则停止Terminated执行器组,否则,超时剩余等待时间timeLeft,Terminated事件执行器。
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1231netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 1970netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2360netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1255netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1252netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1518netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1758netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1325netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2748netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2094netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 1938netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1012netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1806netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1158netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1140netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 890netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1235netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2124netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 992netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1788netty 管道线定义-ChannelPipeline:htt ...
相关推荐
8_Netty多客户端连接与通信 9_Netty读写检测机制与长连接要素 10_Netty对WebSocket的支援 11_Netty实现服务器端与客户端的长连接通信 12_Google Protobuf详解 13_定义Protobuf文件及消息详解 14_Protobuf完整实例...
第8讲:Netty多客户端连接与通信 第9讲:Netty读写检测机制与长连接要素 第10讲:Netty对WebSocket的支援 第11讲:Netty实现服务器端与客户端的长连接通信 第12讲:Google Protobuf详解 第13讲:定义Protobuf...
7_Netty的Socket编程详解 8_Netty多客户端连接与通信 9_Netty读写检测机制与长连接要素 10_Netty对WebSocket的支援 11_Netty实现服务器端与客户端的长连接通信 12_Google Protobuf详解 13_定义Protobuf文件及消息...
多线程支持: Java内置了对多线程的支持,允许程序同时执行多个任务。这对于开发需要高并发性能的应用程序(如服务器端应用、网络应用等)非常重要。 自动内存管理(垃圾回收): Java具有自动内存管理机制,通过...
NioEventLoopGroup的类图如下: 各接口功能说明 Executor接口:任务执行器 ExecutorService接口:提供了...MultithreadEventExecutorGroup:基于多线程的 EventExecutor 的分组抽象类 EventLoopGroup接口:提供注册通
MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...
面试高频算法、akka、多线程、NIO、Netty、SpringBoot、Spark&&Flink 等 Java是一种高性能、跨平台的面向对象编程语言。它由Sun Microsystems(现在是Oracle Corporation)的James Gosling等人在1995年推出...
MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...
MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...
MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...
MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...
MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...
MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...
MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...
MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...
MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...
MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...
MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...
MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...
原理是初始化颜色选择按钮,然后为颜色选择按钮增加事件处理事件,最后实例化颜色选择器。 Java二进制IO类与文件复制操作实例 16个目标文件 内容索引:Java源码,初学实例,二进制,文件复制 Java二进制IO类与文件...