- 浏览: 949342 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 事件执行器组和事件执行器定义及抽象实现:http://donald-draper.iteye.com/blog/2391257
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
引言:
上一篇文章我们看了多线程事件执行器组,先来回顾一下:
多线程事件执行器组MultithreadEventExecutorGroup,内部有一个事件执行器数组存放组内的事件执行器;readonlyChildren为组内事件执行器集的可读包装集Set;terminatedChildren(AtomicInteger),用于记录已关闭的事件执行器数;termination为执行器组terminated异步任务结果;同时有一个事件执行器选择器chooser(EventExecutorChooser)。构造多线程执行器组,首先检查线程数参数,如果执行器不为空,则初始化线程执行器的线程工厂,创建事件执行器集,并根据执行器和相关参数创建事件执行器,实际创建方法为newChild,待子类实现,初始化事件执行器选择器,创建terminated事件执行器监听器,添加terminated事件执行器监听器到terminated异步任务结果,包装事件执行器集为只读集readonlyChildren。
获取执行器组的下一个事件执行器方法委托个内存的事件执行器选择器chooser;返回的迭代器为内部只读执行器集的迭代器;而关闭执行器组方法,实际为遍历管理的事件执行器集,关闭执行器;判断执行器组是否关闭和Terminated,当且仅当组内的事件执行器都关闭和Terminated时,才返回true;超时等待Terminated执行器组方法,实际为遍历事件执行器组超时等待时间耗完,则停止Terminated执行器组,否则,超时剩余等待时间timeLeft,Terminated事件执行器。
本想这篇看一下多线程事件循环组呢,但是其实现了事件循环组,我们这篇先来看一下EventLoopGroup
//MultithreadEventLoopGroup
下面来看事件循环组EventLoopGroup的定义
从上面可以看出,事件循环组EventLoopGroup继承了事件执行器组EventExecutorGroup,next方法返回的为事件循环EventLoop,
事件循环组主要所做的工作为通道注册。
再来看事件循环EventLoop接口的定义:
//EventLoop
从上面可以看出事件循环EventLoop可理解为已顺序、串行的方式处理提交的任务的事件执行器EventExecutor。事件循环组EventLoopGroup可以理解为特殊的事件执行器组EventExecutorGroup;事件执行器组管理事件执行器,事件循环组管理事件循环。
再来看事件循环抽象实现:
抽象事件循环AbstractEventLoop继承了抽象事件执行器,实现了事件循环接口。
鉴于当这样已经把事件循环和事件循环组看完,那就来看下多线程事件循环组:
从上面可以看出,多线程事件循环组MultithreadEventLoopGroup继承了多线程事件执行器组,实现了事件循环组接口,
相关注册通道方法委托给多线程事件循环组的next事件循环,线程工程创建的线程优先级默认为最大线程优先级;
默认事件循环线程数为1和可用处理器数的2倍中的最大者,这个线程数就是构造多线程事件执行器组事件执行器数量。
//NioEventLoopGroup
从Nio事件循环组创建事件循环可以看出事件循环为NioEventLoop,这也就是接下来的文章要看的,先列出
Nio事件循环声明继承树。
总结:
事件循环组EventLoopGroup继承了事件执行器组EventExecutorGroup,next方法返回的为事件循环EventLoop,事件循环组主要所做的工作为通道注册。
事件循环EventLoop可理解为已顺序、串行的方式处理提交的任务的事件执行器EventExecutor。事件循环组EventLoopGroup可以理解为特殊的事件执行器组EventExecutorGroup;事件执行器组管理事件执行器,事件循环组管理事件循环。抽象事件循环AbstractEventLoop继承了抽象事件执行器AbstractEventExecutor,实现了事件循环接口。
多线程事件循环组MultithreadEventLoopGroup继承了多线程事件执行器组,实现了事件循环组接口,相关注册通道方法委托给多线程事件循环组的next事件循环,线程工程创建的线程优先级默认为最大线程优先级;默认事件循环线程数为1和可用处理器数的2倍中的最大者,这个线程数就是构造多线程事件执行器组事件执行器数量。
附:
在多线程事件循环组的静态语句中,初始化默认事件循环线程数有下面一段:
我们来看一下NettyRuntime
netty 多线程事件执行器组:http://donald-draper.iteye.com/blog/2391270
引言:
上一篇文章我们看了多线程事件执行器组,先来回顾一下:
多线程事件执行器组MultithreadEventExecutorGroup,内部有一个事件执行器数组存放组内的事件执行器;readonlyChildren为组内事件执行器集的可读包装集Set;terminatedChildren(AtomicInteger),用于记录已关闭的事件执行器数;termination为执行器组terminated异步任务结果;同时有一个事件执行器选择器chooser(EventExecutorChooser)。构造多线程执行器组,首先检查线程数参数,如果执行器不为空,则初始化线程执行器的线程工厂,创建事件执行器集,并根据执行器和相关参数创建事件执行器,实际创建方法为newChild,待子类实现,初始化事件执行器选择器,创建terminated事件执行器监听器,添加terminated事件执行器监听器到terminated异步任务结果,包装事件执行器集为只读集readonlyChildren。
获取执行器组的下一个事件执行器方法委托个内存的事件执行器选择器chooser;返回的迭代器为内部只读执行器集的迭代器;而关闭执行器组方法,实际为遍历管理的事件执行器集,关闭执行器;判断执行器组是否关闭和Terminated,当且仅当组内的事件执行器都关闭和Terminated时,才返回true;超时等待Terminated执行器组方法,实际为遍历事件执行器组超时等待时间耗完,则停止Terminated执行器组,否则,超时剩余等待时间timeLeft,Terminated事件执行器。
本想这篇看一下多线程事件循环组呢,但是其实现了事件循环组,我们这篇先来看一下EventLoopGroup
//MultithreadEventLoopGroup
package io.netty.channel; import io.netty.util.NettyRuntime; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.EventExecutorChooserFactory; import io.netty.util.concurrent.MultithreadEventExecutorGroup; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; /** * Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at * the same time. */ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
下面来看事件循环组EventLoopGroup的定义
package io.netty.channel; import io.netty.util.concurrent.EventExecutorGroup; /** * Special {@link EventExecutorGroup} which allows registering {@link Channel}s that get * processed for later selection during the event loop. * */ public interface EventLoopGroup extends EventExecutorGroup { /** * Return the next {@link EventLoop} to use 返回下一个事件循环 */ @Override EventLoop next(); /** * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture} * will get notified once the registration was complete. 注册通道到事件循环,返回异步通道注册结果,当注册完成通知结果。 */ ChannelFuture register(Channel channel); /** * Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed * {@link ChannelFuture} will get notified once the registration was complete and also will get returned. 注册可写异步通道任务结果关联的通道 */ ChannelFuture register(ChannelPromise promise); /** * Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture} * will get notified once the registration was complete and also will get returned. *注册通道到事件循环,当注册完成,通知异步通道注册结果。 * @deprecated Use {@link #register(ChannelPromise)} instead. */ @Deprecated ChannelFuture register(Channel channel, ChannelPromise promise); }
从上面可以看出,事件循环组EventLoopGroup继承了事件执行器组EventExecutorGroup,next方法返回的为事件循环EventLoop,
事件循环组主要所做的工作为通道注册。
再来看事件循环EventLoop接口的定义:
//EventLoop
package io.netty.channel; import io.netty.util.concurrent.OrderedEventExecutor; /** * Will handle all the I/O operations for a {@link Channel} once registered. * * One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on * implementation details and internals. 一个事件循环实例可以处理多个通道,这个具体要依赖于具体的实现。 * */ public interface EventLoop extends OrderedEventExecutor, EventLoopGroup { //获取事件循环所属的事件循环组 @Override EventLoopGroup parent(); } //OrderedEventExecutor package io.netty.util.concurrent; /** * Marker interface for {@link EventExecutor}s that will process all submitted tasks in an ordered / serial fashion. 标记一个事件执行器顺序、串行的方式处理提交的任务 */ public interface OrderedEventExecutor extends EventExecutor { }
从上面可以看出事件循环EventLoop可理解为已顺序、串行的方式处理提交的任务的事件执行器EventExecutor。事件循环组EventLoopGroup可以理解为特殊的事件执行器组EventExecutorGroup;事件执行器组管理事件执行器,事件循环组管理事件循环。
再来看事件循环抽象实现:
package io.netty.channel; import io.netty.util.concurrent.AbstractEventExecutor; /** * Skeletal implementation of {@link EventLoop}. */ public abstract class AbstractEventLoop extends AbstractEventExecutor implements EventLoop { protected AbstractEventLoop() { } protected AbstractEventLoop(EventLoopGroup parent) { super(parent); } @Override public EventLoopGroup parent() { return (EventLoopGroup) super.parent(); } @Override public EventLoop next() { return (EventLoop) super.next(); } }
抽象事件循环AbstractEventLoop继承了抽象事件执行器,实现了事件循环接口。
鉴于当这样已经把事件循环和事件循环组看完,那就来看下多线程事件循环组:
package io.netty.channel; import io.netty.util.NettyRuntime; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.EventExecutorChooserFactory; import io.netty.util.concurrent.MultithreadEventExecutorGroup; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; /** * Abstract base class for {@link EventLoopGroup} implementations that handles their tasks with multiple threads at * the same time. */ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class); private static final int DEFAULT_EVENT_LOOP_THREADS;//默认事件循环线程数 static { //默认事件循环线程数为1和可用处理器数的2倍中的最大者 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } //下面的构造函数都是具体可以参考多线程事件执行器组的相应的构造 /** * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...) */ protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } /** * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...) */ protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); } /** * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, * EventExecutorChooserFactory, Object...) */ protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args); } @Override protected ThreadFactory newDefaultThreadFactory() { //创建的默认线程工厂的线程优先级默认为最大优先级 return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY); } @Override public EventLoop next() { return (EventLoop) super.next(); } //创建事务循环,待子类实现 @Override protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception; @Override public ChannelFuture register(Channel channel) { return next().register(channel); } @Override public ChannelFuture register(ChannelPromise promise) { return next().register(promise); } @Deprecated @Override public ChannelFuture register(Channel channel, ChannelPromise promise) { return next().register(channel, promise); } }
从上面可以看出,多线程事件循环组MultithreadEventLoopGroup继承了多线程事件执行器组,实现了事件循环组接口,
相关注册通道方法委托给多线程事件循环组的next事件循环,线程工程创建的线程优先级默认为最大线程优先级;
默认事件循环线程数为1和可用处理器数的2倍中的最大者,这个线程数就是构造多线程事件执行器组事件执行器数量。
//NioEventLoopGroup
/** * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s. */ public class NioEventLoopGroup extends MultithreadEventLoopGroup { ... @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } }
从Nio事件循环组创建事件循环可以看出事件循环为NioEventLoop,这也就是接下来的文章要看的,先列出
Nio事件循环声明继承树。
/** * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a * {@link Selector} and so does the multi-plexing of these in the event loop. * */ public final class NioEventLoop extends SingleThreadEventLoop {
/** * Abstract base class for {@link EventLoop}s that execute all its submitted tasks in a single thread. * */ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
/** * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread. * */ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
总结:
事件循环组EventLoopGroup继承了事件执行器组EventExecutorGroup,next方法返回的为事件循环EventLoop,事件循环组主要所做的工作为通道注册。
事件循环EventLoop可理解为已顺序、串行的方式处理提交的任务的事件执行器EventExecutor。事件循环组EventLoopGroup可以理解为特殊的事件执行器组EventExecutorGroup;事件执行器组管理事件执行器,事件循环组管理事件循环。抽象事件循环AbstractEventLoop继承了抽象事件执行器AbstractEventExecutor,实现了事件循环接口。
多线程事件循环组MultithreadEventLoopGroup继承了多线程事件执行器组,实现了事件循环组接口,相关注册通道方法委托给多线程事件循环组的next事件循环,线程工程创建的线程优先级默认为最大线程优先级;默认事件循环线程数为1和可用处理器数的2倍中的最大者,这个线程数就是构造多线程事件执行器组事件执行器数量。
附:
在多线程事件循环组的静态语句中,初始化默认事件循环线程数有下面一段:
private static final int DEFAULT_EVENT_LOOP_THREADS;//默认事件循环线程数 static { //默认事件循环线程数为1和可用处理器数的2倍中的最大者 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } }
我们来看一下NettyRuntime
package io.netty.util; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.SystemPropertyUtil; import java.util.Locale; /** * A utility class for wrapping calls to {@link Runtime}. 运行时包装类 */ public final class NettyRuntime { //可利用处理器holder private static final AvailableProcessorsHolder holder = new AvailableProcessorsHolder(); /** * Holder class for available processors to enable testing. */ static class AvailableProcessorsHolder { private int availableProcessors;//可利用的处理器数量 /** * Set the number of available processors. *设置可利用的处理器数量 * @param availableProcessors the number of available processors * @throws IllegalArgumentException if the specified number of available processors is non-positive * @throws IllegalStateException if the number of available processors is already configured */ synchronized void setAvailableProcessors(final int availableProcessors) { ObjectUtil.checkPositive(availableProcessors, "availableProcessors"); if (this.availableProcessors != 0) { final String message = String.format( Locale.ROOT, "availableProcessors is already set to [%d], rejecting [%d]", this.availableProcessors, availableProcessors); throw new IllegalStateException(message); } this.availableProcessors = availableProcessors; } /** * Get the configured number of available processors. The default is {@link Runtime#availableProcessors()}. * This can be overridden by setting the system property "io.netty.availableProcessors" or by invoking * {@link #setAvailableProcessors(int)} before any calls to this method. * * @return the configured number of available processors */ @SuppressForbidden(reason = "to obtain default number of available processors") synchronized int availableProcessors() { if (this.availableProcessors == 0) { //获取可以用的系统可用的处理器数量 final int availableProcessors = SystemPropertyUtil.getInt( "io.netty.availableProcessors", Runtime.getRuntime().availableProcessors()); setAvailableProcessors(availableProcessors); } return this.availableProcessors; } } /** * Set the number of available processors. *设置可利用的处理器数量 * @param availableProcessors the number of available processors * @throws IllegalArgumentException if the specified number of available processors is non-positive * @throws IllegalStateException if the number of available processors is already configured */ @SuppressWarnings("unused,WeakerAccess") // this method is part of the public API public static void setAvailableProcessors(final int availableProcessors) { holder.setAvailableProcessors(availableProcessors); } /** * Get the configured number of available processors. The default is {@link Runtime#availableProcessors()}. This * can be overridden by setting the system property "io.netty.availableProcessors" or by invoking * {@link #setAvailableProcessors(int)} before any calls to this method. *获取配置的可用处理器数量,默认的为Runtime#availableProcessors()。在调用此方法前, 这个值可以被设置io.netty.availableProcessors属性或#setAvailableProcessors(int)重写。 * @return the configured number of available processors */ public static int availableProcessors() { return holder.availableProcessors(); } /** * No public constructor to prevent instances from being created. */ private NettyRuntime() { } }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1230netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 1964netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2354netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1255netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1248netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1516netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1755netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1322netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2745netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2090netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 1935netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1009netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1805netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1156netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1139netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 887netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1233netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2120netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 990netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1786netty 管道线定义-ChannelPipeline:htt ...
相关推荐
netty5深入源码解析多线程编程,netty5高性能并发编程的首选
使用Netty4实现多线程的消息分发,这是一个基于netty4做的一个异步通信模型。
Netty多线程案例集锦
Netty多线程并发编程
netty中的多线程并发应用,
netty的简介,netty是一种socket技术,实现长连接,此技术更加成熟1
Netty 通讯框架 多线程 源代码 刚写的一个通讯框架,可以启动多个client,并设置client数量,可以用于测试Netty框架支持多少用户数量
Netty服务器线程模型概览_线程模型
这是一本详细介绍了netty线程模型的书籍,包括 nio核心
多线程并发编程在Netty中的应用分析
多线程并发编程在Netty的应用分析 这个是带完整目录书签的文字版本,文本内容可以复制的哦
再根据本人实际学习体验总结而成。本部分内容可能不那么全面,但是我尽量挑选Netty中我认为比较重要的部分做讲解。
大致内容包括: Java内存模型 Java内存交互协议 Java的线程 Netty的并发编程分析 正确的使用锁 volatile的正确使用 CAS指令和原子类 线程安全类 读写锁的应用
netty 数据分包、组包、粘包处理机制(部分)1
Netty是一个提供异步事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 换句话说,Netty是一个NIO框架,使用它可以简单快速地开发网络应用程序,比如客户端和服务端的协议。Netty...
深入Hotspot源码与Linux内核理解NIO与Netty线程模型
基于JAVA IO, NIO, Netty, 多线程并发实战源码.zip
Netty核心精讲之Reactor线程模型源码分析 Netty核心精讲之Reactor线程模型源码分析
05.Netty线程模型.rar
Netty源码解读之线程