`
Donald_Draper
  • 浏览: 951110 次
社区版块
存档分类
最新评论

netty 通道初始化器ChannelInitializer

阅读更多
netty 消息解码器-ByteToMessageDecoder:http://donald-draper.iteye.com/blog/2388088
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
netty 通道处理器上下文定义:http://donald-draper.iteye.com/blog/2389214
netty 通道处理器上下文:http://donald-draper.iteye.com/blog/2389299
引言:
在前面的文章中,我们看了通道处理器,Channel管道线,和通道上下文,梳理了他们之间的关系,那么通道处理器如何添加通道的管道中呢?这个不得不提通道初始化器ChannelInitializer。
在netty的相关demo实例中,有这么一段:
 EventLoopGroup workerGroup = new NioEventLoopGroup();
  try {
  	//Bootstrap,用于配置客户端,一般为Socket通道
      Bootstrap bootstrap = new Bootstrap();
      bootstrap.group(workerGroup)
       .channel(NioSocketChannel.class)
       .handler(new ChannelInitializer<SocketChannel>() {
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
          	 //添加安全套接字处理器和通道处理器到
               ChannelPipeline pipeline = ch.pipeline();
               if (sslCtx != null) {
              	 pipeline.addLast(sslCtx.newHandler(ch.alloc(), ip, port));
               }
               pipeline.addLast(new LoggingHandler(LogLevel.INFO));
               pipeline.addLast(new EchoClientHandler());
           }
       });
      InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
      //连接socket地址
      ChannelFuture f = bootstrap.connect(inetSocketAddress).sync();
      log.info("=========Client is start=========");
      //等待,直到连接关闭
      f.channel().closeFuture().sync();
  } finally {
  	workerGroup.shutdownGracefully();
}

今天我们关注的是通道处理器的初始化:
.handler(new ChannelInitializer<SocketChannel>() {
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
          	 //添加安全套接字处理器和通道处理器到
               ChannelPipeline pipeline = ch.pipeline();
               if (sslCtx != null) {
              	 pipeline.addLast(sslCtx.newHandler(ch.alloc(), ip, port));
               }
               pipeline.addLast(new LoggingHandler(LogLevel.INFO));
               pipeline.addLast(new EchoClientHandler());
           }
});

上面一段代码,有一个通道处理器初始化类ChannelInitializer,
这个类就是今天我们要看的:
package io.netty.channel;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.concurrent.ConcurrentMap;

/**
 * A special {@link ChannelInboundHandler} which offers an easy way to initialize a {@link Channel} once it was
 * registered to its {@link EventLoop}.
 *ChannelInitializer是一个特殊的inboun通道处理器,一旦通道注册到事件循环中时,提供一个便捷的初始化通道的方式。
 * Implementations are most often used in the context of {@link Bootstrap#handler(ChannelHandler)} ,
 * {@link ServerBootstrap#handler(ChannelHandler)} and {@link ServerBootstrap#childHandler(ChannelHandler)} to
 * setup the {@link ChannelPipeline} of a {@link Channel}.
 *具体的实现一般在Bootstrap#handler(ChannelHandler) ,ServerBootstrap#handler(ChannelHandler)和
 ServerBootstrap#childHandler(ChannelHandler)中,设置Channel的管道。
 * <pre>
 *
 * public class MyChannelInitializer extends {@link ChannelInitializer} {
 *     public void initChannel({@link Channel} channel) {
 *         channel.pipeline().addLast("myHandler", new MyHandler());
 *     }
 * }
 *
 * {@link ServerBootstrap} bootstrap = ...;
 * ...
 * bootstrap.childHandler(new MyChannelInitializer());
 * ...
 * </pre>
 * Be aware that this class is marked as {@link Sharable} and so the implementation must be safe to be re-used.
 *注意此类为共享类型,具体的实现必须是线程安全的。
 * @param <C>   A sub-type of {@link Channel} 参数类型C为通道的子类
 */
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
    // We use a ConcurrentMap as a ChannelInitializer is usually shared between all Channels in a Bootstrap /
    // ServerBootstrap. This way we can reduce the memory usage compared to use Attributes.
    //由于通道初始器经常在Bootstrap /ServerBootstrap的所有通道中共享,所以我们用一个ConcurrentMap作为初始化器。
    //这种方式,相对于使用属性方式,减少了内存的使用。
    private final ConcurrentMap<ChannelHandlerContext, Boolean> initMap = PlatformDependent.newConcurrentHashMap();
    //注册通道处理器动事件循环
     @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
        // the handler.
	//正常情况下这个方法不为被调用,因为handlerAdded可以用于初始化通道
        if (initChannel(ctx)) {
            // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
            // miss an event.
	    //调用上下文关联通道的fireChannelRegistered,确保不会调试事件
            ctx.pipeline().fireChannelRegistered();
        } else {
            // Called initChannel(...) before which is the expected behavior, so just forward the event.
	    //转发事件
            ctx.fireChannelRegistered();
        }
    }
    /**
     * {@inheritDoc} If override this method ensure you call super!
     如果重写,必须保证能够调用super
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //当通道注册到事件循环中
        if (ctx.channel().isRegistered()) {
            // This should always be true with our current DefaultChannelPipeline implementation.
            // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
            // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
            // will be added in the expected order
            //在当前默认Channel管道下的实现下,总是返回true。在handlerAdded方法中调用initChannel方法好处是,如果一个初始化器
	    //添加到另外一个初始化器,不会从排序。所有的通道处理器将会以期望的顺序添加Channel管道中
            initChannel(ctx);
        }
    }
   //初始化通道处理器上下文
    @SuppressWarnings("unchecked")
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        //将上下文放入通道初始化器的上下文Map中
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
	        //添加成功,则调用initChannel(C ch),初始化通道
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
		//异常发生则抛出log异常,并关闭上下文
                exceptionCaught(ctx, cause);
            } finally {
	        //最后从通道初始化器的上下文Map移除当前上下文
                remove(ctx);
            }
            return true;
        }
        return false;
    }
     /**
     * This method will be called once the {@link Channel} was registered. After the method returns this instance
     * will be removed from the {@link ChannelPipeline} of the {@link Channel}.
     *一旦通道注册,此方法将会被调用。方法将会调用完毕,此处理器实例将会从通道的管道中移除
     * @param ch            the {@link Channel} which was registered.
     * @throws Exception    is thrown if an error occurs. In that case it will be handled by
     *                      {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default close
     *                      the {@link Channel}.
     */
    protected abstract void initChannel(C ch) throws Exception;
    /**
     * Handle the {@link Throwable} by logging and closing the {@link Channel}. Sub-classes may override this.
     处理异常,并关闭上下文
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause);
        ctx.close();
    }
    //移除通道处理器上下文
    private void remove(ChannelHandlerContext ctx) {
        try {
	    //从当前通道的管道中移除上下文
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
        } finally {
	    //从通道初始化器上下文Map移除上下文
            initMap.remove(ctx);
        }
    }
}


总结:
通道初始化器ChannelInitializer实际上为Inbound通道处理器,当通道注册到事件循环中后,添加通道初始化器到通道,触发handlerAdded事件,然后将初始化器的上下文放入通道初始化器的上下文Map中,如果放入成功且先前不存在,initChannel(C ch),初始化通道,其中C为当前通道,我们可以获取C的管道,添加通道处理器到管道,这就是通道初始化器的作用。
添加完后,从通道的管道中移除初始化器上下文,并从通道初始化器的上下文Map中移除通道初始化器上下文。
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics