`
Donald_Draper
  • 浏览: 952626 次
社区版块
存档分类
最新评论

netty 默认Channel管道线-Inbound和Outbound事件处理

阅读更多
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 异步任务-ChannelFuture:http://donald-draper.iteye.com/blog/2388297
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
netty 默认Channel管道线初始化:http://donald-draper.iteye.com/blog/2388613
netty 默认Channel管道线-添加通道处理器:http://donald-draper.iteye.com/blog/2388726
netty 默认Channel管道线-通道处理器移除与替换:http://donald-draper.iteye.com/blog/2388793
引言:
前面一篇文章我们看了管道线处理器的移除和替换,先来回顾一下:
   无论是根据名称,处理器句柄,还是根据类型移除通道处理器,都是首先获取对应的处理器上下文,从管道中移除对应的上下文,如果通道已经从事件循环反注册,则添加移除回调任务到管道回调任务链,否则直接创建线程(触发上下文关联的处理器handlerRemoved事件,更新上下文状态为已移除),有上下文关联的事件执行器执行。
      无论是根据名称,处理器句柄,还是根据类型替换通道处理器,都是首先获取对应的
处理器上下文,然后添加新上下文到管道中原始上下文的位置,并将原始上下文的前驱和后继同时指向新上下文,以便转发剩余的buf内容;可以简单理解为添加新上下文,移除原始上下文,注意必须先添加,后移除,因为移除操作会触发channelRead和flush事件,而这些事件处理必须在handlerAdded事件后。
今天我们来看管道线触发Inbound和Outbound事件的操作:
//DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline 

//ChannelPipeline
public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> 

从管道线类声明来看继承了Inbound和Outbound通道Invoker,来看相关的方法实现:
先来看Inbound事件触发操作:
//通道注册
@Override
public final ChannelPipeline fireChannelRegistered() {
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

//AbstractChannelHandlerContext
//触发上下文的invokeChannelRegistered方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();//获取上下文事件执行器
    //如果事件执行器在当前事务循环,则直接调用上下文invokeChannelRegistered方法
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        //否则创建一个线程执行上下文invokeChannelRegistered方法,并有有上下文事务执行器运行
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}
//触发通道channelRegistered事件
private void invokeChannelRegistered() {
    //如果通道处理器已添加到管道
    if (invokeHandler()) {
        try {
	    //触发通道处理器的channelRegistered事件
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        //转发事件消息
        fireChannelRegistered();
    }
}

上述方法有一下几点要看
1.
//判断通道处理器已添加到管道
/**
 * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
 * yet. If not return {@code false} and if called or could not detect return {@code true}.
 *确保通道处理器的handlerAdded方法已触发。
 * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
 * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
 * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
 如果失败,则不会调用通道处理器的相关事件处理方法,而是转发事件。这种情况主要针对通道处理器已经添加到管道,
 但通道处理器handlerAdded方法没有被调用的情况,即通道处理器关联的上下文已经添加管道上下文链,但并没有更新上下文状态
 和触发通道处理器的handlerAdded方法。
 */
private boolean invokeHandler() {
    // Store in local variable to reduce volatile reads.
    int handlerState = this.handlerState;
    return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}

2.
//转发事件消息
Override
public ChannelHandlerContext fireChannelRegistered() {
    //转发事件给上下文所属管道的下一个上下文
    invokeChannelRegistered(findContextInbound());
    return this;
}

//获取上下文所属管道的下一个Inbound上下文
private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
}

从上面可以看出,管道处理通道注册方法fireChannelRegistered,首先从
头部上下文开始,如果上下文已经添加到管道,则触发上下文关联的通道处理器的
channelRegistered事件,否则转发事件给上下文所属管道的下一个上下文。

再来看通道反注册方法
@Override
public final ChannelPipeline fireChannelUnregistered() {
    AbstractChannelHandlerContext.invokeChannelUnregistered(head);
    return this;
}

处理过程与fireChannelRegistered方法思路相同,只不过触发的通道处理器事件为channelUnregistered
,简单看一下
//AbstractChannelHandlerContext
static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelUnregistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelUnregistered();
            }
        });
    }
}
private void invokeChannelUnregistered() {
    if (invokeHandler()) {
        try {
	   //
            ((ChannelInboundHandler) handler()).channelUnregistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelUnregistered();
    }
}
@Override
public ChannelHandlerContext fireChannelUnregistered() {
    invokeChannelUnregistered(findContextInbound());
    return this;
}


再来看管道处理关联通道激活事件:
//通道激活
@Override
public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

//AbstractChannelHandlerContext
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelActive();
            }
        });
    }
}

private void invokeChannelActive() {
    if (invokeHandler()) {
        try {
	    //触发处理器channelActive事件
            ((ChannelInboundHandler) handler()).channelActive(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelActive();
    }
}

@Override
public ChannelHandlerContext fireChannelActive() {
    invokeChannelActive(findContextInbound());
    return this;
}


//通道断开
@Override
public final ChannelPipeline fireChannelInactive() {
    AbstractChannelHandlerContext.invokeChannelInactive(head);
    return this;
}

 static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
     EventExecutor executor = next.executor();
     if (executor.inEventLoop()) {
         next.invokeChannelInactive();
     } else {
         executor.execute(new Runnable() {
             @Override
             public void run() {
                 next.invokeChannelInactive();
             }
         });
     }
 }

 private void invokeChannelInactive() {
     if (invokeHandler()) {
         try {
	     //触发处理器channelInactive事件
             ((ChannelInboundHandler) handler()).channelInactive(this);
         } catch (Throwable t) {
             notifyHandlerException(t);
         }
     } else {
         fireChannelInactive();
     }
 }

 @Override
 public ChannelHandlerContext fireChannelInactive() {
     invokeChannelInactive(findContextInbound());
     return this;
 }

//IO异常
@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
    AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
    return this;
}


//AbstractChannelHandlerContext
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
    ObjectUtil.checkNotNull(cause, "cause");
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeExceptionCaught(cause);
    } else {
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeExceptionCaught(cause);
                }
            });
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to submit an exceptionCaught() event.", t);
                logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
            }
        }
    }
}

private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
        try {
	    //触发处理器exceptionCaught事件
            handler().exceptionCaught(this, cause);
        } catch (Throwable error) {
            if (logger.isDebugEnabled()) {
                logger.debug(
                    "An exception {}" +
                    "was thrown by a user handler's exceptionCaught() " +
                    "method while handling the following exception:",
                    ThrowableUtil.stackTraceToString(error), cause);
            } else if (logger.isWarnEnabled()) {
                logger.warn(
                    "An exception '{}' [enable DEBUG level for full stacktrace] " +
                    "was thrown by a user handler's exceptionCaught() " +
                    "method while handling the following exception:", error, cause);
            }
        }
    } else {
        fireExceptionCaught(cause);
    }
}

@Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
    invokeExceptionCaught(next, cause);
    return this;
}


//触发用户事件
@Override
public final ChannelPipeline fireUserEventTriggered(Object event) {
    AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
    return this;
}

//AbstractChannelHandlerContext
static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
    ObjectUtil.checkNotNull(event, "event");
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeUserEventTriggered(event);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeUserEventTriggered(event);
            }
        });
    }
}

private void invokeUserEventTriggered(Object event) {
    if (invokeHandler()) {
        try {
	     //触发处理器userEventTriggered事件
            ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireUserEventTriggered(event);
    }
}

@Override
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
    invokeUserEventTriggered(findContextInbound(), event);
    return this;
}


//读数据
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

//AbstractChannelHandlerContext

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    //记录消息引用对象,用于内存泄漏时,调试
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
	    //触发处理器channelRead事件
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

读数据操作过程有一点我们需要关注一下:
//记录消息引用对象,用于内存泄漏时,调试
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);

//DefaultChannelPipeline
  final Object touch(Object msg, AbstractChannelHandlerContext next) {
        //如果内存泄漏探测开启,则记录消息引用对象,否则直至返回消息对象
        return touch ? ReferenceCountUtil.touch(msg, next) : msg;
    }

//ReferenceCountUtil
/**
     * Tries to call {@link ReferenceCounted#touch(Object)} if the specified message implements
     * {@link ReferenceCounted}.  If the specified message doesn't implement {@link ReferenceCounted},
     * this method does nothing.
     */
    @SuppressWarnings("unchecked")
    public static <T> T touch(T msg, Object hint) {
        if (msg instanceof ReferenceCounted) {
            return (T) ((ReferenceCounted) msg).touch(hint);
        }
        return msg;
    }

//ReferenceCounted
/**
     * Records the current access location of this object with an additional arbitrary information for debugging
     * purposes.  If this object is determined to be leaked, the information recorded by this operation will be
     * provided to you via {@link ResourceLeakDetector}.
     */
    ReferenceCounted touch(Object hint);


//读数据完成
@Override
public final ChannelPipeline fireChannelReadComplete() {
    AbstractChannelHandlerContext.invokeChannelReadComplete(head);
    return this;
}

//AbstractChannelHandlerContext
 static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
     EventExecutor executor = next.executor();
     if (executor.inEventLoop()) {
         next.invokeChannelReadComplete();
     } else {
         Runnable task = next.invokeChannelReadCompleteTask;
         if (task == null) {
             next.invokeChannelReadCompleteTask = task = new Runnable() {
                 @Override
                 public void run() {
                     next.invokeChannelReadComplete();
                 }
             };
         }
         executor.execute(task);
     }
 }

 private void invokeChannelReadComplete() {
     if (invokeHandler()) {
         try {
             ((ChannelInboundHandler) handler()).channelReadComplete(this);
         } catch (Throwable t) {
             notifyHandlerException(t);
         }
     } else {
         fireChannelReadComplete();
     }
 }
@Override
 public ChannelHandlerContext fireChannelReadComplete() {
     invokeChannelReadComplete(findContextInbound());
     return this;
 }

//写状态改变
@Override
public final ChannelPipeline fireChannelWritabilityChanged() {
    AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
    return this;
}

//AbstractChannelHandlerContext

static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
     EventExecutor executor = next.executor();
     if (executor.inEventLoop()) {
         next.invokeChannelWritabilityChanged();
     } else {
         Runnable task = next.invokeChannelWritableStateChangedTask;
         if (task == null) {
             next.invokeChannelWritableStateChangedTask = task = new Runnable() {
                 @Override
                 public void run() {
                     next.invokeChannelWritabilityChanged();
                 }
             };
         }
         executor.execute(task);
     }
 }

 private void invokeChannelWritabilityChanged() {
     if (invokeHandler()) {
         try {
             ((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
         } catch (Throwable t) {
             notifyHandlerException(t);
         }
     } else {
         fireChannelWritabilityChanged();
     }
 }

 @Override
 public ChannelHandlerContext fireChannelWritabilityChanged() {
     invokeChannelWritabilityChanged(findContextInbound());
     return this;
 }

从上面可以看出管道处理Inbound事件首先从头部上下文开始,直到尾部上下文,最后默认直接丢弃。

再来看Outbound事件触发操作:
//绑定socket的地址
@Override
public final ChannelFuture bind(SocketAddress localAddress) {  
   //从管道尾部开始
    return tail.bind(localAddress);
}

//AbstractChannelHandlerContext
@Override
  public ChannelFuture bind(SocketAddress localAddress) {
      return bind(localAddress, newPromise());
  }

//创建通道任务DefaultChannelPromise
/**
 * The default {@link ChannelPromise} implementation.  It is recommended to use {@link Channel#newPromise()} to create
 * a new {@link ChannelPromise} rather than calling the constructor explicitly.
 */
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {

    private final Channel channel;
    private long checkpoint;
    ...
    /**
     * Creates a new instance.
     *
     * @param channel
     *        the {@link Channel} associated with this future
     */
    public DefaultChannelPromise(Channel channel, EventExecutor executor) {
        super(executor);
        this.channel = channel;
    }
}

//绑定socket地址
 @Override
 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
     if (localAddress == null) {
         throw new NullPointerException("localAddress");
     }
     if (isNotValidPromise(promise, false)) {
        //非可写通道任务,直接返回
         // cancelled
         return promise;
     }
    //从当前上下文开始(尾部),向前找到第一个Outbound上下文,处理地址绑定事件
     final AbstractChannelHandlerContext next = findContextOutbound();
     //获取上下为事件执行器
     EventExecutor executor = next.executor();
     if (executor.inEventLoop()) {
         //如果事件执行器线程在事件循环中,则直接委托给invokeBind
         next.invokeBind(localAddress, promise);
     } else {
         safeExecute(executor, new Runnable() {
             @Override
             public void run() {
                 next.invokeBind(localAddress, promise);
             }
         }, promise, null);
     }
     return promise;
 }
//触发通道处理器地址绑定事件
 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
     
     if (invokeHandler()) {//如果通道处理器已经添加到管道中
         try {
	    //触发Outbound通道处理器的bind事件方法
             ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
         } catch (Throwable t) {
             notifyOutboundHandlerException(t, promise);
         }
     } else {
        //否则传递绑定事件给管道中的下一个Outbound上下文
         bind(localAddress, promise);
     }
 }

  private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
        try {
            executor.execute(runnable);
        } catch (Throwable cause) {
            try {
                promise.setFailure(cause);
            } finally {
                if (msg != null) {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    }

再来看一下寻找Outbound处理器:
private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}


从上面可以看出管道处理通道处理器地址绑定bind事件,首先从管道上下文链的尾部开始,
寻找Outbound上下文,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,
则触发通道处理器地址绑定事件#invokeBind,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;#invokeBind首先判断通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。

管道处理器其他Outbound事件的思路,基本相同,我们只展示一下代码
//连接指定远端地址
@Override
public final ChannelFuture connect(SocketAddress remoteAddress) {
    return tail.connect(remoteAddress);
}

//AbstractChannelHandlerContext
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
    return connect(remoteAddress, newPromise());
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return connect(remoteAddress, null, promise);
}
@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeConnect(remoteAddress, localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        connect(remoteAddress, localAddress, promise);
    }
}


//绑定本地地址,连接远端地址
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return tail.connect(remoteAddress, localAddress);
}

//AbstractChannelHandlerContext
@Override
 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
     return connect(remoteAddress, localAddress, newPromise());
 }

//断开通道连接
@Override
public final ChannelFuture disconnect() {
    return tail.disconnect();
}

//AbstractChannelHandlerContext
 @Override
 public ChannelFuture disconnect() {
     return disconnect(newPromise());
 }

@Override
public ChannelFuture disconnect(final ChannelPromise promise) {
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
        // So far, UDP/IP is the only transport that has such behavior.
        if (!channel().metadata().hasDisconnect()) {
	    //如果还没有连接,则关闭通道
            next.invokeClose(promise);
        } else {
	   //否则断开通道
            next.invokeDisconnect(promise);
        }
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                if (!channel().metadata().hasDisconnect()) {
                    next.invokeClose(promise);
                } else {
                    next.invokeDisconnect(promise);
                }
            }
        }, promise, null);
    }
    return promise;
}
//触发通道处理器关闭事件
  private void invokeClose(ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).close(this, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            close(promise);
        }
    }
 //触发通道处理器断开事件
 private void invokeDisconnect(ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).disconnect(this, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            disconnect(promise);
        }
    }

//关闭通道
@Override
public final ChannelFuture close() {
    return tail.close();
}

//AbstractChannelHandlerContext
@Override
 public ChannelFuture close() {
     return close(newPromise());
 }
 @Override
public ChannelFuture close(final ChannelPromise promise) {
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeClose(promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeClose(promise);
            }
        }, promise, null);
    }

    return promise;
}

private void invokeClose(ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).close(this, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        close(promise);
    }
}


//反注册
@Override
public final ChannelFuture deregister() {
    return tail.deregister();
}

//AbstractChannelHandlerContext
@Override
public ChannelFuture deregister() {
    return deregister(newPromise());
}
@Override
public ChannelFuture deregister(final ChannelPromise promise) {
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeDeregister(promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeDeregister(promise);
            }
        }, promise, null);
    }

    return promise;
}

private void invokeDeregister(ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).deregister(this, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        deregister(promise);
    }
}

//刷新写请求数据
@Override
public final ChannelPipeline flush() {
    tail.flush();
    return this;
}

//AbstractChannelHandlerContext
@Override
public ChannelHandlerContext flush() {
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        //触发通道处理器刷新事件操作
        next.invokeFlush();
    } else {
        //获取当前上下文的刷新任务线程
        Runnable task = next.invokeFlushTask;
        if (task == null) {
            next.invokeFlushTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeFlush();
                }
            };
        }
        safeExecute(executor, task, channel().voidPromise(), null);
    }

    return this;
}
 private void invokeFlush() {
    if (invokeHandler()) {
        invokeFlush0();
    } else {
        flush();
    }
}

private void invokeFlush0() {
    try {
        ((ChannelOutboundHandler) handler()).flush(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

//读操作
@Override
public final ChannelPipeline read() {
    tail.read();
    return this;
}

//AbstractChannelHandlerContext
 @Override
public ChannelHandlerContext read() {
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeRead();
    } else {
       //获取上下文读任务线程
        Runnable task = next.invokeReadTask;
        if (task == null) {
            next.invokeReadTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeRead();
                }
            };
        }
        executor.execute(task);
    }

    return this;
}

private void invokeRead() {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).read(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        read();
    }
}


//写消息
@Override
public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}

//AbstractChannelHandlerContext
 @Override
 public ChannelFuture write(Object msg) {
     return write(msg, newPromise());
 }
 @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return promise;
            }
        } catch (RuntimeException e) {
	    //出现异常,释放消息对象
            ReferenceCountUtil.release(msg);
            throw e;
        }
        write(msg, false, promise);

        return promise;
    }

    private void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            write(msg, promise);
        }
    }

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }


//写消息,并发送
@Override
public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

//AbstractChannelHandlerContext
@Override
public ChannelFuture writeAndFlush(Object msg) {
    return writeAndFlush(msg, newPromise());
}
 @Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }

    if (isNotValidPromise(promise, true)) {
        ReferenceCountUtil.release(msg);
        // cancelled
        return promise;
    }

    write(msg, true, promise);

    return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
	    //如果刷新,则调用invokeWriteAndFlush
            next.invokeWriteAndFlush(m, promise);
        } else {
	    //否则调用invokeWrite
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
	    //创建写刷新任务线程
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
	    //创建写任务线程
            task = WriteTask.newInstance(next, m, promise);
        }
	//执行任务线程
        safeExecute(executor, task, promise, m);
    }
}

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}


通道处理器上下文的写WriteTask和刷新写WriteAndFlushTask任务线程定义,见附篇AbstractChannelHandlerContext部分。

下面的这些带可以通道任务参数ChannelPromise,与上面相应的方法基本相同不再说
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

@Override
public final ChannelFuture connect(
        SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, localAddress, promise);
}

@Override
public final ChannelFuture disconnect(ChannelPromise promise) {
    return tail.disconnect(promise);
}

@Override
public final ChannelFuture close(ChannelPromise promise) {
    return tail.close(promise);
}

@Override
public final ChannelFuture deregister(final ChannelPromise promise) {
    return tail.deregister(promise);
}

@Override
public final ChannelFuture write(Object msg, ChannelPromise promise) {
    return tail.write(msg, promise);
}

@Override
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    return tail.writeAndFlush(msg, promise);
}

从上面可以看,管道处理Outbound相关事件,从尾部上下文到头部上下文,到达头部时,交由
上下文所属管道关联的Channel的Unsafe处理。

总结:

      管道处理通道注册方法fireChannelRegistered,首先从头部上下文开始,如果上下文已经添加到管道,则触发上下文关联的通道处理器的channelRegistered事件,否则转发事件给上下文所属管道的下一个上下文;其他处罚Inbound事件的处理过程与fireChannelRegistered方法思路相同,只不过触发的通道处理器的相应事件。管道处理Inbound事件首先从头部上下文开始,直到尾部上下文,最后默认直接丢弃。
     管道处理通道处理器地址绑定bind事件,首先从管道上下文链的尾部开始,寻找Outbound上下文,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,则触发通道处理器地址绑定事件#invokeBind,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;#invokeBind首先判断通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。管道处理Outbound相关事件,从尾部上下文到头部上下文,到达头部时,交由
上下文所属管道关联的Channel的Unsafe处理。

附:
//DefaultChannelPipeline
//创建通道异步任务结果
@Override
public final ChannelPromise newPromise() {
    return new DefaultChannelPromise(channel);
}

@Override
public final ChannelProgressivePromise newProgressivePromise() {
    return new DefaultChannelProgressivePromise(channel);
}

@Override
public final ChannelFuture newSucceededFuture() {
    return succeededFuture;
}

@Override
public final ChannelFuture newFailedFuture(Throwable cause) {
    return new FailedChannelFuture(channel, null, cause);
}

@Override
public final ChannelPromise voidPromise() {
    return voidPromise;
}

再来看管道的其他方法
//获取通道消息大小估算Handler
final MessageSizeEstimator.Handle estimatorHandle() {
    if (estimatorHandle == null) {
        estimatorHandle = channel.config().getMessageSizeEstimator().newHandle();
    }
    return estimatorHandle;
}

//MessageSizeEstimator
/**
 * Responsible to estimate size of a message. The size represent how much memory the message will ca. reserve in
 * memory.
 负责估算消息的大小。大小表示消息需要多少内存,以便预留内存
 */
public interface MessageSizeEstimator {
    /**
     * Creates a new handle. The handle provides the actual operations.
     */
    Handle newHandle();
    interface Handle {
        /**
         * Calculate the size of the given message.
	 计算给定消息的大小
         *
         * @param msg       The message for which the size should be calculated
         * @return size     The size in bytes. The returned size must be >= 0
         */
        int size(Object msg);
    }
}

//获取事件执行器组的事件执行器
private EventExecutor childExecutor(EventExecutorGroup group) {
     if (group == null) {
         return null;
     }
     //获取通道的SINGLE_EVENTEXECUTOR_PER_GROUP的配置项
     //是否每个事件执行器组拥有一个执行器
     Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
     if (pinEventExecutor != null && !pinEventExecutor) {
         //如果可以同用事件执行器,则返回事件执行器组的事件执行器
         return group.next();
     }
     //否则获取管道的子事件执行器
     Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
     if (childExecutors == null) {
         //如果子执行器为空,则创建四个事件执行器组
         // Use size of 4 as most people only use one extra EventExecutor.
         childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
     }
     // Pin one of the child executors once and remember it so that the same child executor
     // is used to fire events for the same channel.
     //获取分组的事件执行器
     EventExecutor childExecutor = childExecutors.get(group);
     if (childExecutor == null) {
         //如果事件执行器组的事件执行器为空,则获取下一个事件执行器
         childExecutor = group.next();
         childExecutors.put(group, childExecutor);
     }
     return childExecutor;
 }

//EventExecutorGroup
/**
 * The {@link EventExecutorGroup} is responsible for providing the {@link EventExecutor}'s to use
 * via its {@link #next()} method. Besides this, it is also responsible for handling their
 * life-cycle and allows shutting them down in a global fashion.
 *
 */
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {


//EventExecutor
/**
 * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes
 * with some handy methods to see if a {@link Thread} is executed in a event loop.
 * Besides this, it also extends the {@link EventExecutorGroup} to allow for a generic
 * way to access methods.
 *
 */
public interface EventExecutor extends EventExecutorGroup {


//IdentityHashMap
/**
 * This class implements the <tt>Map</tt> interface with a hash table, using
 * reference-equality in place of object-equality when comparing keys (and
 * values).  In other words, in an <tt>IdentityHashMap</tt>, two keys
 * <tt>k1</tt> and <tt>k2</tt> are considered equal if and only if
 * <tt>(k1==k2)</tt>.  (In normal <tt>Map</tt> implementations (like
 * <tt>HashMap</tt>) two keys <tt>k1</tt> and <tt>k2</tt> are considered equal
 * if and only if <tt>(k1==null ? k2==null : k1.equals(k2))</tt>.)
 ...
  *
 * <p><b>This class is <i>not</i> a general-purpose <tt>Map</tt>
 * implementation!  While this class implements the <tt>Map</tt> interface, it
 * intentionally violates <tt>Map's</tt> general contract, which mandates the
 * use of the <tt>equals</tt> method when comparing objects.  This class is
 * designed for use only in the rare cases wherein reference-equality
 * semantics are required.</b>
 *
 * <p>A typical use of this class is <i>topology-preserving object graph
 * transformations</i>, such as serialization or deep-copying.  To perform such
 * a transformation, a program must maintain a "node table" that keeps track
 * of all the object references that have already been processed.  The node
 * table must not equate distinct objects even if they happen to be equal.
 * Another typical use of this class is to maintain <i>proxy objects</i>.  For
 * example, a debugging facility might wish to maintain a proxy object for
 * each object in the program being debugged.
  *
 * @see     System#identityHashCode(Object)
 * @see     Object#hashCode()
 * @see     Collection
 * @see     Map
 * @see     HashMap
 * @see     TreeMap
 * @author  Doug Lea and Josh Bloch
 * @since   1.4
 IdentityHashMap与普通Map的区别在于Key的相等的条件不同,一般判断key是否相等
 为if(k1==null ? k2==null : k1.equals(k2))则相等,而IdentityHashMap为k1==k2
 相同,则任务是相等,可以用于存储
 */
public class IdentityHashMap<K,V>
    extends AbstractMap<K,V>


 //获取关联通道
 @Override
 public final Channel channel() {
     return channel;
 }

 //获取通道处理对应的名称
 private String generateName(ChannelHandler handler) {
        //从通道处理器命名缓存,获取当前线程的通道处理器命名缓存
        Map<Class<?>, String> cache = nameCaches.get();
        Class<?> handlerType = handler.getClass();//获取处理器类型
        String name = cache.get(handlerType);//从缓冲获取通道处理器对应的名称
        if (name == null) {
	    //如果名称为空,则生成处理器名称,并放入缓存
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }

        // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
        // any name conflicts.  Note that we don't cache the names generated here.
	//简单是否存在name对应的上下文
        if (context0(name) != null) {
	    //存在,则重新命名
            String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (context0(newName) == null) {
                    name = newName;
                    break;
                }
            }
        }
        return name;
    }

下面是通道处理器上下文的写和刷新写任务线程定义,只贴出,后面将通道处理器上下文的时候再讲.
//AbstractChannelHandlerContext
abstract static class AbstractWriteTask implements Runnable {
    //在提交时是否估算任务size
    private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
            SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
    //任务负载
    // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
    private static final int WRITE_TASK_OVERHEAD =
            SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);

    private final Recycler.Handle<AbstractWriteTask> handle;
    private AbstractChannelHandlerContext ctx;
    private Object msg;
    private ChannelPromise promise;
    private int size;

    @SuppressWarnings("unchecked")
    private AbstractWriteTask(Recycler.Handle<? extends AbstractWriteTask> handle) {
        this.handle = (Recycler.Handle<AbstractWriteTask>) handle;
    }
    //初始化写任务
    protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
                               Object msg, ChannelPromise promise) {
        task.ctx = ctx;
        task.msg = msg;
        task.promise = promise;

        if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
	        //获取Outbound buf,从上下文关联通道的unsafe获取
            ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();

            // Check for null as it may be set to null if the channel is closed already
            if (buffer != null) {
                task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
                buffer.incrementPendingOutboundBytes(task.size);
            } else {
                task.size = 0;
            }
        } else {
            task.size = 0;
        }
    }

    @Override
    public final void run() {
        try {
            ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
            // Check for null as it may be set to null if the channel is closed already
            if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null) {
                buffer.decrementPendingOutboundBytes(size);
            }
		//写消息
            write(ctx, msg, promise);
        } finally {
            // Set to null so the GC can collect them directly
            ctx = null;
            msg = null;
            promise = null;
            handle.recycle(this);
        }
    }
   //委托给上下文
    protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        ctx.invokeWrite(msg, promise);
    }
}

static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {

    private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
        @Override
        protected WriteTask newObject(Handle<WriteTask> handle) {
            return new WriteTask(handle);
        }
    };
    //创建写任务
    private static WriteTask newInstance(
            AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        WriteTask task = RECYCLER.get();
	//初始化写任务
        init(task, ctx, msg, promise);
        return task;
    }

    private WriteTask(Recycler.Handle<WriteTask> handle) {
        super(handle);
    }
}

static final class WriteAndFlushTask extends AbstractWriteTask {

    private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
        @Override
        protected WriteAndFlushTask newObject(Handle<WriteAndFlushTask> handle) {
            return new WriteAndFlushTask(handle);
        }
    };
    //创建写刷新任务
    private static WriteAndFlushTask newInstance(
            AbstractChannelHandlerContext ctx, Object msg,  ChannelPromise promise) {
        WriteAndFlushTask task = RECYCLER.get();
	//初始化写任务
        init(task, ctx, msg, promise);
        return task;
    }

    private WriteAndFlushTask(Recycler.Handle<WriteAndFlushTask> handle) {
        super(handle);
    }

    @Override
    public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        super.write(ctx, msg, promise);
	//调用上下文flush方法
        ctx.invokeFlush();
    }
}

从上面可以看出管道的写消息和写刷新消息操作,当事件执行器线程不在当前事务中时,
则创建写任务WriteTask或写刷新任务WriteAndFlushTask线程,并交由事件执行器执行。
这两任务的定义在抽象上下文的内部,两种任务的写请求实际为委托给关联上下文的invokeWrite方法,
而对于写刷新任务,在调用关联上下文的invokeWrite方法后,并调用invokeFlush方法。

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics