`
Donald_Draper
  • 浏览: 952641 次
社区版块
存档分类
最新评论

netty 默认Channel管道线-通道处理器移除与替换

阅读更多
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 异步任务-ChannelFuture:http://donald-draper.iteye.com/blog/2388297
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
netty 默认Channel管道线初始化:http://donald-draper.iteye.com/blog/2388613
netty 默认Channel管道线-添加通道处理器:http://donald-draper.iteye.com/blog/2388726
引言:
上一篇文章我们看了Channel管道线添加通道处理器的过程,先来回顾一下:
添加通道处理器到管道头部,首次检查通道处理器是否为共享模式,如果非共享,且已添加,则抛出异常;检查通道处理器名在管道内,是否存在对应通道处理器上下文,已存在抛出异常;根据事件执行器,处理器名,及处理器,构造处理器上下文;添加处理器上限文到管道上下文链头;如果通道没有注册到事件循环,上下文添加到管道时,创建添加通道处理器回调任务,并将任务添加管道的回调任务链中,当通道注册到事件循环时,触发通道处理器的handlerAdded事件,已注册则创建一个线程,用于调用通道处理器的handlerAdded事件方法,及更新上下文状态为已添加,并交由事件执行器执行;最好调用callHandlerAdded0方法,确保调用通道处理器的handlerAdded事件方法,更新上下文状态为已添加。其他last(添加到管道尾部),before(添加指定上下文的前面),after(添加指定上下文的后面)操作,基本上与addfirst思路基本相同,不同的是添加到管道上下文链的位置。
今天来看一下移除通道处理器:
移除指定类型的通道处理器
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    //先获取通道处理器对应的上下文,在委托给上下文移除方法
    remove(getContextOrDie(handler));
    return this;
}
//获取处理器对应的上下文
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    //实际获取上下文方法
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
    if (ctx == null) {
        throw new NoSuchElementException(handler.getClass().getName());
    } else {
        return ctx;
    }
}

@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {
        if (ctx == null) {
            return null;
        }
	//上下文处理器句柄与handler相同,则返回
        if (ctx.handler() == handler) {
            return ctx;
        }
        ctx = ctx.next;
    }
}



//根据名字移除通道处理
@Override
public final ChannelHandler remove(String name) {
    return remove(getContextOrDie(name)).handler();
}

 private AbstractChannelHandlerContext getContextOrDie(String name) {
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
        if (ctx == null) {
            throw new NoSuchElementException(name);
        } else {
            return ctx;
        }
}

@Override
   public final ChannelHandlerContext context(String name) {
       if (name == null) {
           throw new NullPointerException("name");
       }
       return context0(name);
   }

private AbstractChannelHandlerContext context0(String name) {
    AbstractChannelHandlerContext context = head.next;
    while (context != tail) {
        //上下文名与指定的处理器名相同的返回上下文
        if (context.name().equals(name)) {
            return context;
        }
        context = context.next;
    }
    return null;
}


@SuppressWarnings("unchecked")
//移除指定参数类型处理器
@Override
public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
    return (T) remove(getContextOrDie(handlerType)).handler();
}


private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
     AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
     if (ctx == null) {
         throw new NoSuchElementException(handlerType.getName());
     } else {
         return ctx;
     }
 }

@Override
public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
    if (handlerType == null) {
        throw new NullPointerException("handlerType");
    }

    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {
        if (ctx == null) {
            return null;
        }
	//如果上下文关联的处理器类型为handlerType,则返回上下文
        if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
            return ctx;
        }
        ctx = ctx.next;
    }
}

上面的方法没有什么好说的,很简单。
从上面可以看出无论是,无论是根据名称,处理器句柄,还是根据类型,都是首先获取对应的
处理器上下文,
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    //断言移除的上下文既不是头部也不是尾部
    assert ctx != head && ctx != tail;
    synchronized (this) {
        //移除上下文
        remove0(ctx);
        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we remove the context from the pipeline and add a task that will call
        // ChannelHandler.handlerRemoved(...) once the channel is registered.
        if (!registered) {
	    //如果通道已经从事件循环反注册,则添加移除回调任务
            callHandlerCallbackLater(ctx, false);
            return ctx;
        }
	//获取通道处理器上下文事件执行器
        EventExecutor executor = ctx.executor();
        if (!executor.inEventLoop()) {
	   //如果事件执行器不在当前事务循环中,则创建线程执行通道处理器移除相关事件及上下文状态更新,
	   //线程委托给上下文事件执行器
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerRemoved0(ctx);
                }
            });
            return ctx;
        }
    }
    //
    callHandlerRemoved0(ctx);
    return ctx;
}

移除通道上下文方法,有几点要关注
1.
//移除上下文
remove0(ctx);

private static void remove0(AbstractChannelHandlerContext ctx) {
     AbstractChannelHandlerContext prev = ctx.prev;
     AbstractChannelHandlerContext next = ctx.next;
     prev.next = next;
     next.prev = prev;
 }

2.
if (!registered) {
    //如果通道已经从事件循环反注册,则添加移除回调任务
    callHandlerCallbackLater(ctx, false);
    return ctx;
}

//添加移除上下文回调任务到,管道回调任务链
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
      assert !registered;
      PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
      PendingHandlerCallback pending = pendingHandlerCallbackHead;
      if (pending == null) {
          pendingHandlerCallbackHead = task;
      } else {
          // Find the tail of the linked-list.
          while (pending.next != null) {
              pending = pending.next;
          }
          pending.next = task;
      }
  }

private final class PendingHandlerRemovedTask extends PendingHandlerCallback {

        PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        public void run() {
	    //触发处理移除事件,并更新上下为状态为已移除状态
            callHandlerRemoved0(ctx);
        }
      
        @Override
        void execute() {
            EventExecutor executor = ctx.executor();
            if (executor.inEventLoop()) {
                callHandlerRemoved0(ctx);
            } else {
	        //如果移除线程不在当前事务循环中,移除任务交给上下文关联的事件执行器
                try {
                    executor.execute(this);
                } catch (RejectedExecutionException e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
                                        " removing handler {}.", executor, ctx.name(), e);
                    }
                    // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
                    ctx.setRemoved();
                }
            }
        }
    }

3.
//触发处理移除事件,并更新上下为状态为已移除
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
      // Notify the complete removal.
      try {
          try {
	        //触发上下文关联的处理器handlerRemoved事件
              ctx.handler().handlerRemoved(ctx);
          } finally {
	        //更新上下文状态为已移除
              ctx.setRemoved();
          }
      } catch (Throwable t) {
          fireExceptionCaught(new ChannelPipelineException(
                  ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
      }
}

从上面可看出,实际移除通道处理器上下文为,首先从管道中移除对应的上下文,如果通道已经从事件循环反注册,
则添加移除回调任务到管道回调任务链,否则直接创建线程(触发上下文关联的处理器handlerRemoved事件,
更新上下文状态为已移除),有上下文关联的事件执行器执行。
有了上面的铺垫,下面两个方法应该很好理解
//移除管道头部的处理器
@Override
  public final ChannelHandler removeFirst() {
      if (head.next == tail) {
          throw new NoSuchElementException();
      }
      return remove(head.next).handler();
  }
移除管道尾部的处理器
  @Override
  public final ChannelHandler removeLast() {
      if (head.next == tail) {
          throw new NoSuchElementException();
      }
      return remove(tail.prev).handler();
}

上述中的头部与尾部非为傀儡节点,为实际上下文的第一个和最后一个元素。
再来看一下替换通道处理器操作操作:
@Override
public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
    replace(getContextOrDie(oldHandler), newName, newHandler);
    return this;
}
@Override
public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
    return replace(getContextOrDie(oldName), newName, newHandler);
}
@Override
@SuppressWarnings("unchecked")
public final <T extends ChannelHandler> T replace(
        Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
    return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
}

从上可以看无论是根据名称,处理器句柄,还是根据类型替换通道处理器,都是首先获取对应的处理器上下文,然后委托给replace(final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler)
方法。
private ChannelHandler replace(
        final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
    assert ctx != head && ctx != tail;
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(newHandler);
        if (newName == null) {
            newName = generateName(newHandler);
        } else {
            boolean sameName = ctx.name().equals(newName);
            if (!sameName) {
                checkDuplicateName(newName);
            }
        }
	//创建新处理器的上下文
        newCtx = newContext(ctx.executor, newName, newHandler);
	//用新的处理上下文,替换管道上下文链中原始旧的上下文
        replace0(ctx, newCtx);

        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we replace the context in the pipeline
        // and add a task that will call ChannelHandler.handlerAdded(...) and
        // ChannelHandler.handlerRemoved(...) once the channel is registered.
        if (!registered) {
	    //添加新上下文的添加回调任务
            callHandlerCallbackLater(newCtx, true);
	    //添加旧上下文的移除回调任务
            callHandlerCallbackLater(ctx, false);
            return ctx.handler();
        }
        EventExecutor executor = ctx.executor();
        if (!executor.inEventLoop()) {
            executor.execute(new Runnable() {
	       //如果
                @Override
                public void run() {
		    //先添加,后移除,因为移除操作会触发channelRead和flush事件,而这些事件处理必须在handlerAdded事件后
                    // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
                    // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
                    // those event handlers must be called after handlerAdded().
		    //更新新上下文对应的处理器的handlerAdded,并更新,新上下状态为已添加
                    callHandlerAdded0(newCtx);
		    //更新旧上下文对应的处理器的handlerRemoved,并更新,新旧上下状态为已添加
                    callHandlerRemoved0(ctx);
                }
            });
            return ctx.handler();
        }
    }
    // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
    // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
    // event handlers must be called after handlerAdded().
    callHandlerAdded0(newCtx);
    callHandlerRemoved0(ctx);
    return ctx.handler();
}
//替换上下文
private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = oldCtx.prev;
    AbstractChannelHandlerContext next = oldCtx.next;
    newCtx.prev = prev;
    newCtx.next = next;

    // Finish the replacement of oldCtx with newCtx in the linked list.
    // Note that this doesn't mean events will be sent to the new handler immediately
    // because we are currently at the event handler thread and no more than one handler methods can be invoked
    // at the same time (we ensured that in replace().)
    prev.next = newCtx;
    next.prev = newCtx;

    // update the reference to the replacement so forward of buffered content will work correctly
    //主要原始上下文的前驱和后继同时指向新上下文,以便转发剩余的buf内容
    oldCtx.prev = newCtx;
    oldCtx.next = newCtx;
}

从上面可以看出,其实替换上下文,就是添加新上下文到管道中原始上下文的位置,
并将原始上下文的前驱和后继同时指向新上下文,以便转发剩余的buf内容。
可以简单理解为添加新上下文,移除原始上下文,注意必须先添加,后移除,
因为移除操作会触发channelRead和flush事件,而这些事件处理必须在handlerAdded事件后

总结:

      无论是根据名称,处理器句柄,还是根据类型移除通道处理器,都是首先获取对应的处理器上下文,从管道中移除对应的上下文,如果通道已经从事件循环反注册,则添加移除回调任务到管道回调任务链,否则直接创建线程(触发上下文关联的处理器handlerRemoved事件,更新上下文状态为已移除),有上下文关联的事件执行器执行。
      无论是根据名称,处理器句柄,还是根据类型替换通道处理器,都是首先获取对应的
处理器上下文,然后添加新上下文到管道中原始上下文的位置,并将原始上下文的前驱和后继同时指向新上下文,以便转发剩余的buf内容;可以简单理解为添加新上下文,移除原始上下文,注意必须先添加,后移除,因为移除操作会触发channelRead和flush事件,而这些事件处理必须在handlerAdded事件后

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics