`
Donald_Draper
  • 浏览: 953843 次
社区版块
存档分类
最新评论

netty 通道处理器上下文

阅读更多
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
netty 通道处理器上下文定义:http://donald-draper.iteye.com/blog/2389214
引言:
前面一篇文章我们主要看了一下通道处理器上下文接口的定义,先来回顾一下:
通道处理器上下文ChannelHandlerContext,使通道处理器可以与管道和管道中其他处理器进行交互。当IO事件发生时,处理可以将事件转发给所属管道的下一个通道处理器,同时可以动态修改处理器所属的管道。通过上下文可以获取关联通道,处理器,事件执行器,上下文名,所属管道等信息。同时可以通过AttributeKey存储上下文属性,用alloc方法获取通道上下文的字节buf分配器,用于分配buf。
今天我们来看上下文的抽象实现。
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakHint;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.OrderedEventExecutor;
import io.netty.util.internal.PromiseNotificationUtil;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
    volatile AbstractChannelHandlerContext next;//上下文后继
    volatile AbstractChannelHandlerContext prev;//上下文前驱
    //上下文状态
    private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
    /**
     * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
     通道处理器handlerAdded事件将要触发
     */
    private static final int ADD_PENDING = 1;
    /**
     * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
     通道处理器handlerAdded事件已经触发
     */
    private static final int ADD_COMPLETE = 2;
    /**
     * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
     通道处理器上下文已经从所属管道移除
     */
    private static final int REMOVE_COMPLETE = 3;
    /**
     * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
     * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
     上下文初始化状态
     */
    private static final int INIT = 0;

    private final boolean inbound;//Inbound处理器上下文标志
    private final boolean outbound;//Outbound处理器上下文标志
    private final DefaultChannelPipeline pipeline;//上下文所属管道
    private final String name;//上下文名,对应通道处理器添加时的name
    private final boolean ordered;//事件执行器是否为顺序执行器

    // Will be set to null if no child executor should be used, otherwise it will be set to the
    // child executor.
    //如果没有子事件执行器可用,为空,否则为子事件执行器
    final EventExecutor executor;//事件执行器
    private ChannelFuture succeededFuture;//通道异步任务结果

    // Lazily instantiated tasks used to trigger events to a handler with different executor.
    // There is no need to make this volatile as at worse it will just create a few more instances then needed.
    //延时创建任务用于在不同的执行器中,处理触发事件,这些任务需要为volatile,最糟糕的情况下,仅仅创建比实际需要,多一点的任务
    private Runnable invokeChannelReadCompleteTask;//读完成任务线程
    private Runnable invokeReadTask;//上下文读任务线程
    private Runnable invokeChannelWritableStateChangedTask;//通道可写状态改变任务线程
    private Runnable invokeFlushTask;//上下文刷新任务线程

    private volatile int handlerState = INIT;

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
	//检查上下为name是否为空
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
	//是否为Ordered事件执行器
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }
}

从上面可以看出抽象通道处理器上下文AbstractChannelHandlerContext,拥有一个前驱和后继上下文,用于在管道中传递IO事件;通道处理器总共有四个状态,分别为初始化,正在添加到管道,已添加管道和从管道移除状态;上下文同时关联一个管道;Inbound和Outbound两个用于判断上下文的类型,决定了上下文是处理器Inbound事件还是Outbound事件;一个事件执行器executor,当上下文执行器不在当前事务循环中时,用于执行IO事件操作;同时有一些延时任务,如上下文读任务,上下文刷新任务,读完成任务和通道可写状态改变任务。上下文构造,主要是初始化上下文name,所属管道,事件执行器,上下文类型。

下面我们来看Inbound事件处理,
先来看上下文处理通道的channelRegistered事件
@Override
    public ChannelHandlerContext fireChannelRegistered() {
        invokeChannelRegistered(findContextInbound());
        return this;
    }

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();//获取上下文事件执行器
    //如果事件执行器在当前事务循环,则直接调用上下文invokeChannelRegistered方法
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        //否则创建一个线程执行上下文invokeChannelRegistered方法,并有上下文事务执行器运行
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}
//触发通道channelRegistered事件
private void invokeChannelRegistered() {
    //如果通道处理器已添加到管道
    if (invokeHandler()) {
        try {
	    //触发通道处理器的channelRegistered事件
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
	   //通知异常
            notifyHandlerException(t);
        }
    } else {
        //转发事件消息
        fireChannelRegistered();
    }
}

上述方法有一下几点要看
1.
//判断通道处理器已添加到管道
/**
 * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
 * yet. If not return {@code false} and if called or could not detect return {@code true}.
 *确保通道处理器的handlerAdded方法已触发。
 * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
 * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
 * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
 如果失败,则不会调用通道处理器的相关事件处理方法,而是转发事件。这种情况主要针对通道处理器已经添加到管道,
 但通道处理器handlerAdded方法没有被调用的情况,即通道处理器关联的上下文已经添加管道上下文链,但并没有更新上下文状态
 和触发通道处理器的handlerAdded方法。
 */
private boolean invokeHandler() {
    // Store in local variable to reduce volatile reads.
    int handlerState = this.handlerState;
    return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}

2.
//转发事件消息
Override
public ChannelHandlerContext fireChannelRegistered() {
    //转发事件给上下文所属管道的下一个上下文
    invokeChannelRegistered(findContextInbound());
    return this;
}
//获取上下文所属管道的下一个Inbound上下文
private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
}

再来看异常处理
 
//通知异常
notifyHandlerException(t);

private void notifyHandlerException(Throwable cause) {
     //判断异常堆栈信息中是否存在exceptionCaught方法信息
     if (inExceptionCaught(cause)) {
         //如果是通道处理器exceptionCaught方法抛出的异常,则直接log
         if (logger.isWarnEnabled()) {
             logger.warn(
                     "An exception was thrown by a user handler " +
                             "while handling an exceptionCaught event", cause);
         }
         return;
     }
     //否则,触发通道处理器异常处理方法exceptionCaught
     invokeExceptionCaught(cause);
 }

//判断异常堆栈信息中是否存在exceptionCaught方法信息
 private static boolean inExceptionCaught(Throwable cause) {
     do {
         //获取异常堆栈frame信息
         StackTraceElement[] trace = cause.getStackTrace();
         if (trace != null) {
             for (StackTraceElement t : trace) {
                 if (t == null) {
                     break;
                 }
		 //是否是exceptionCaught方法抛出的异常,是则返回true
                 if ("exceptionCaught".equals(t.getMethodName())) {
                     return true;
                 }
             }
         }
         cause = cause.getCause();
     } while (cause != null);

     return false;
 }

//StackTraceElement
package java.lang;

import java.util.Objects;

/**
 * An element in a stack trace, as returned by {@link
 * Throwable#getStackTrace()}.  Each element represents a single stack frame.
 * All stack frames except for the one at the top of the stack represent
 * a method invocation.  The frame at the top of the stack represents the
 * execution point at which the stack trace was generated.  Typically,
 * this is the point at which the throwable corresponding to the stack trace
 * was created.
 用于描述异常堆栈的frame
 *
 * @since  1.4
 * @author Josh Bloch
 */
public final class StackTraceElement implements java.io.Serializable {
    // Normally initialized by VM (public constructor added in 1.5)
    private String declaringClass;//异常类
    private String methodName;//异常发生点方法
    private String fileName;//异常发生的文件名
    private int    lineNumber;//异常发生的行号
    ...
}

再来看,触发通道处理器异常处理方法exceptionCaught
private void invokeExceptionCaught(final Throwable cause) {
        //如果通道处理器已添加到管道
        if (invokeHandler()) {
            try {
	        //触发通道处理器exceptionCaught事件
                handler().exceptionCaught(this, cause);
            } catch (Throwable error) {
                if (logger.isDebugEnabled()) {
                    logger.debug(
                        "An exception {}" +
                        "was thrown by a user handler's exceptionCaught() " +
                        "method while handling the following exception:",
                        ThrowableUtil.stackTraceToString(error), cause);
                } else if (logger.isWarnEnabled()) {
                    logger.warn(
                        "An exception '{}' [enable DEBUG level for full stacktrace] " +
                        "was thrown by a user handler's exceptionCaught() " +
                        "method while handling the following exception:", error, cause);
                }
            }
        } else {
	   //否则转递IO异常事件给管道中的下一个Inbound处理器
            fireExceptionCaught(cause);
        }
}
 @Override
 public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
     invokeExceptionCaught(next, cause);
     return this;
 }

从上面可以看出,上下文处理通道fireChannelRegistered事件,
如果上下文事件执行器在当前事务循环,则直接在当前线程,执行触发上下文关联通道处理器的channelRegistered事件任务,否则,创建一个线程执行事件任务,并由上下文事务执行器运行;
触发上下文关联通道处理器的channelRegistered事件任务,首先判断上下文是否已经添加到管道,已添加,则触发上下文关联通道处理器的channelRegistered事件,否则转发事件给上下文所属管道的下一个Inbound上下文。如果Inbound事件处理过程中,异常发生,首先检查异常是不是通道处理器的exceptionCaught方法抛出,是,则直接log,否则触发上下文关联通道处理器的exceptionCaught事件。其他Inbound事件的处理过程与fireChannelRegistered方法思路相同,
只不过触发的是通道处理器的相应事件;
前面的文章已讲,可以参考
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
我们这里不再赘述。

再来看Outbound地址绑定事件的处理:

@Override
  public ChannelFuture bind(SocketAddress localAddress) {
      return bind(localAddress, newPromise());
  }

//创建通道任务DefaultChannelPromise
/**
 * The default {@link ChannelPromise} implementation.  It is recommended to use {@link Channel#newPromise()} to create
 * a new {@link ChannelPromise} rather than calling the constructor explicitly.
 */
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {

    private final Channel channel;
    private long checkpoint;
    ...
    /**
     * Creates a new instance.
     *
     * @param channel
     *        the {@link Channel} associated with this future
     */
    public DefaultChannelPromise(Channel channel, EventExecutor executor) {
        super(executor);
        this.channel = channel;
    }
}

//绑定socket地址
 @Override
 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
     if (localAddress == null) {
         throw new NullPointerException("localAddress");
     }
     if (isNotValidPromise(promise, false)) {
        //非可写通道任务,直接返回
         // cancelled
         return promise;
     }
    //从当前上下文开始(尾部),向前找到第一个Outbound上下文,处理地址绑定事件
     final AbstractChannelHandlerContext next = findContextOutbound();
     //获取上下为事件执行器
     EventExecutor executor = next.executor();
     if (executor.inEventLoop()) {
         //如果事件执行器线程在事件循环中,则直接委托给invokeBind
         next.invokeBind(localAddress, promise);
     } else {
         safeExecute(executor, new Runnable() {
             @Override
             public void run() {
                 next.invokeBind(localAddress, promise);
             }
         }, promise, null);
     }
     return promise;
 }
//触发通道处理器地址绑定事件
 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
     
     if (invokeHandler()) {//如果通道处理器已经添加到管道中
         try {
	    //触发Outbound通道处理器的bind事件方法
             ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
         } catch (Throwable t) {
	     //通知异常
             notifyOutboundHandlerException(t, promise);
         }
     } else {
        //否则传递绑定事件给管道中的下一个Outbound上下文
         bind(localAddress, promise);
     }
 }

  private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
        try {
            executor.execute(runnable);
        } catch (Throwable cause) {
            try {
	        //执行事件失败
                promise.setFailure(cause);
            } finally {
                if (msg != null) {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    }

再来看一下寻找Outbound处理器:
private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

最后来看一下异常处理
//通知异常
notifyOutboundHandlerException(t, promise);

private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
    // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
    // false.
    //直接委托给异步任务结果通知工具PromiseNotificationUtil
    PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
}


//PromiseNotificationUtil
package io.netty.util.internal;

import io.netty.util.concurrent.Promise;
import io.netty.util.internal.logging.InternalLogger;

/**
 * Internal utilities to notify {@link Promise}s.
内部异步任务结果通知工具
 */
public final class PromiseNotificationUtil {

    private PromiseNotificationUtil() { }

    /**
     * Try to cancel the {@link Promise} and log if {@code logger} is not {@code null} in case this fails.
     通知异步任务取消
     */
    public static void tryCancel(Promise<?> p, InternalLogger logger) {
        if (!p.cancel(false) && logger != null) {
            Throwable err = p.cause();
            if (err == null) {
                logger.warn("Failed to cancel promise because it has succeeded already: {}", p);
            } else {
                logger.warn(
                        "Failed to cancel promise because it has failed already: {}, unnotified cause:",
                        p, err);
            }
        }
    }

    /**
     * Try to mark the {@link Promise} as success and log if {@code logger} is not {@code null} in case this fails.
     通知异步任务成功
     */
    public static <V> void trySuccess(Promise<? super V> p, V result, InternalLogger logger) {
        if (!p.trySuccess(result) && logger != null) {
            Throwable err = p.cause();
            if (err == null) {
                logger.warn("Failed to mark a promise as success because it has succeeded already: {}", p);
            } else {
                logger.warn(
                        "Failed to mark a promise as success because it has failed already: {}, unnotified cause:",
                        p, err);
            }
        }
    }

    /**
     * Try to mark the {@link Promise} as failure and log if {@code logger} is not {@code null} in case this fails.
     通知异步任务取消失败
     */
    public static void tryFailure(Promise<?> p, Throwable cause, InternalLogger logger) {
        if (!p.tryFailure(cause) && logger != null) {
            Throwable err = p.cause();
            if (err == null) {
                logger.warn("Failed to mark a promise as failure because it has succeeded already: {}", p, cause);
            } else {
                logger.warn(
                        "Failed to mark a promise as failure because it has failed already: {}, unnotified cause: {}",
                        p, ThrowableUtil.stackTraceToString(err), cause);
            }
        }
    }

}


从上面可以看出上下文处理关联通道处理器的地址绑定bind事件,首先从所属管道上下文链的尾部开始,寻找Outbound上下文,找到后,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,则触发上下文关联通道处理器地址绑定事件,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;触发上下文关联通道处理器地址绑定事件,首先判断上下文关联通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。如果在Outbound事件处理器过程中,出现异常则直接委托给异步任务结果通知工具PromiseNotificationUtil,通知异步任务
失败,并log异常日志。
上下文,处理其他Outbound事件的思路,基本相同,
前面文章已将,可以参考
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
我们这里不再赘述。

再来看其他方法,很简单,不用说了:
@Override
public Channel channel() {
    return pipeline.channel();
}
@Override
public ChannelPipeline pipeline() {
    return pipeline;
}
@Override
public ByteBufAllocator alloc() {
    return channel().config().getAllocator();
}
@Override
public EventExecutor executor() {
    if (executor == null) {
        return channel().eventLoop();
    } else {
        return executor;
    }
}
@Override
public String name() {
    return name;
}
@Override
public boolean isRemoved() {
    return handlerState == REMOVE_COMPLETE;
}

@Override
public <T> Attribute<T> attr(AttributeKey<T> key) {
    return channel().attr(key);
}

@Override
public <T> boolean hasAttr(AttributeKey<T> key) {
    return channel().hasAttr(key);
}
//返回可读的资源泄漏信息,用于追踪资源泄漏情况
 @Override
 public String toHintString() {
     return '\'' + name + "' will handle the message from this point.";
 }

 @Override
 public String toString() {
     return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
 }


再来通道处理器上下文的默认实现:
//通道处理器上下文默认实现DefaultChannelHandlerContext:
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
    private final ChannelHandler handler;//关联通道处理器
    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }
    @Override
    public ChannelHandler handler() {
        return handler;
    }
    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }
    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
}

通道处理器上下文默认实现DefaultChannelHandlerContext内部关联一个通道处理器。
总结:

       抽象通道处理器上下文AbstractChannelHandlerContext,拥有一个前驱和后继上下文,用于在管道中传递IO事件;通道处理器总共有四个状态,分别为初始化,正在添加到管道,已添加管道和从管道移除状态;上下文同时关联一个管道;Inbound和Outbound两个用于判断上下文的类型,决定了上下文是处理器Inbound事件还是Outbound事件;一个事件执行器executor,当上下文执行器不在当前事务循环中时,用于执行IO事件操作;同时有一些延时任务,如上下文读任务,上下文刷新任务,读完成任务和通道可写状态改变任务。上下文构造,主要是初始化上下文name,所属管道,事件执行器,上下文类型。上下文关联的通道通道处理器在具体的实现中定义,比如通道处理器上下文默认实现为DefaultChannelHandlerContext,内部关联一个通道处理器。
      上下文处理通道fireChannelRegistered事件,如果上下文事件执行器在当前事务循环,则直接在当前线程,执行触发上下文关联通道处理器的channelRegistered事件任务,否则,创建一个线程执行事件任务,并由上下文事务执行器运行;触发上下文关联通道处理器的channelRegistered事件任务,首先判断上下文是否已经添加到管道,已添加,则触发
上下文关联通道处理器的channelRegistered事件,否则转发事件给上下文所属管道的下一个Inbound上下文。其他Inbound事件的处理过程与fireChannelRegistered方法思路相同,只不过触发的是通道处理器的相应事件;如果Inbound事件处理过程中,异常发生,首先检查异常是不是通道处理器的exceptionCaught方法抛出,是,则直接log,否则触发上下文关联通道处理器的exceptionCaught事件。
      上下文处理关联通道处理器的地址绑定bind事件,首先从所属管道上下文链的尾部开始,
寻找Outbound上下文,找到后,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,则触发上下文关联通道处理器地址绑定事件,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;触发上下文关联通道处理器地址绑定事件,首先判断上下文关联通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。如果在Outbound事件处理器过程中,出现异常则直接委托给异步任务结果通知工具PromiseNotificationUtil,通知异步任务失败,并log异常日志。
      netty的通道处理器上下文和mina的会话有点像,都拥有描述通道Handler的Context;
不同的时mina中的会话与通道直接关联,而netty上下文与通道间是通过Channel管道关联起来,mina中的过滤链是依附于会话,而netty上下文依附于Channel管道,mina中的IO事件执行器为IoProcessor,netty中的IO事件的处理委托给事件循环
或上下文的子事件执行器。


附:
/**
 * A hint object that provides human-readable message for easier resource leak tracking.
 */
public interface ResourceLeakHint {
    /**
     * Returns a human-readable message that potentially enables easier resource leak tracking.
     */
    String toHintString();
}
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics