`
Donald_Draper
  • 浏览: 952629 次
社区版块
存档分类
最新评论

netty 默认Channel管道线-添加通道处理器

阅读更多
netty Inboudn/Outbound通道Invoker:http://donald-draper.iteye.com/blog/2388233
netty 异步任务-ChannelFuture:http://donald-draper.iteye.com/blog/2388297
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
netty 默认Channel管道线初始化:http://donald-draper.iteye.com/blog/2388613
引言:
上一篇文章我们看了默认Channel管道线初始化,先来回顾一下:
每个通道拥有一个Channel管道线;管道线用于管理,通道事件处理Handler ChannelHandler,管道线管理通道处理器的方式,为通道处理器器上下文模式,即每个通道处理器在管道中,是以通道上下文的形式存在;通道上下文关联一个通道处理器,通道上下文描述通道处理器的上下文,通道上下文拥有一个前驱和后继上下文,即通道上下文在管道线中是一个双向链表,通道处理器上下文通过inbound和oubound两个布尔标志,判断通道处理器是inbound还是outbound。上下文链表的头部为HeadContext,尾部为TailContext。
       头部上下文HeadContext的outbound的相关操作,直接委托给管道线所属通道的unsafe(Native API),inbound事件直接触发通道处理器上下文的相关事件,以便通道处理器上下文关联的通道Handler处理相关事件,但读操作实际是通过Channel读取。HeadContext的通道注册方法channelRegistered,主要是执行通道处理器添加回调任务链中的任务。处理器添加回调任务主要是触发触发上下文关联通道处理器的handlerAdded事件,更新上下文状态为添加完毕状态,如果过程中有异常发生,则移除通道上下文。channelUnregistered方法,主要是在通道从选择器反注册时,清空管道线程的通道处理器上下文,并触发上下文关联的通道处理器handlerRemoved事件,更新上下文状态为已移除。
       Channel的管道线的通道处理器上下文链的尾部TailContext是一个傀儡,不同于尾部上下文,头部上下文,在处理inbound事件时,触发通道处理器上下文相关的方法,在处理outbound事件时,委托给管道线关联的Channle的内部unsafe。
       默认Channel管道实现内部有两个回调任务PendingHandlerAdded/RemovedTask,一个是添加通道处理器上下文回调任务,一个是移除通道上下文回调任务,主要是触发上下文关联通道处理器的处理器添加移除事件,并更新相应的上下文状态为已添加或已移除。
       管道构造,主要是检查管道通道是否为空,初始化管道上下文链的头部与尾部上下文。
       netty通道处理器上下文可以说,是Mina中Hanlder和过滤器的集合,整合两者功能,管道线有点Mina过滤链的意味,HeadContext相当于Mina过滤链的头部过滤器,TailContext相当于Mina过滤链的尾部过滤器。
今天我们来看管道线如果添加和移除及替换通道处理器。
先来看添加通道处理器:
@Override
public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
   //根据事件执行器,处理器名及处理器,添加处理器到管道
    return addFirst(null, name, handler);
}
//根据事件执行器,处理器名及处理器,添加处理器到管道
@Override
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        //检查处理器是否为共享模式
        checkMultiplicity(handler);
	//检查处理器名
        name = filterName(name, handler);
	//根据事件执行器,处理器名,及处理器,构造处理器上下文
        newCtx = newContext(group, name, handler);
	//添加处理上限文到管道上下文链
        addFirst0(newCtx);
        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
	//如果通道没有到事件循环,上下文添加到管道,需要添加一个回调任务,
	//当通道注册到事件循环时,触发通道处理器的handlerAdded事件
        if (!registered) {
            newCtx.setAddPending();//更新上下文状态为正在添加
	    //创建添加通道处理器回调任务,并将任务添加管道的回调任务链中
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
	//获取上下文的时间执行器
        EventExecutor executor = newCtx.executor();
	//如果事件执行器不在事件循环中
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();//更新上下文状态为正在添加
	    //创建一个线程,用于调用通道处理器的handlerAdded事件方法,及更新上下文状态为已添加
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    //调用通道处理器的handlerAdded事件方法,更新上下文状态为已添加
    callHandlerAdded0(newCtx);
    return this;
}

添加通道处理器方法有一下几点要看:
1.
 //检查处理器是否为共享模式
checkMultiplicity(handler);

//检查处理器是否为共享模式
 private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {
	    //如果非共享,且已添加,则抛出异常
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }

2.
//检查处理器名
 name = filterName(name, handler);


private String filterName(String name, ChannelHandler handler) {
      if (name == null) {
          //如果处理器名为空,则产生处理器对应的名称
          return generateName(handler);
      }
      //否者检查处理器名是否与管道内的处理器名是否相同
      checkDuplicateName(name);
      return name;
}

private void checkDuplicateName(String name) {
       if (context0(name) != null) {
            //如果处理器名已经存在,则抛出异常
           throw new IllegalArgumentException("Duplicate handler name: " + name);
       }
}

private AbstractChannelHandlerContext context0(String name) {
     AbstractChannelHandlerContext context = head.next;
     //检查管道内的处理器上下文是否存在与name相同的上下文
     while (context != tail) {
         if (context.name().equals(name)) {
             return context;
         }
         context = context.next;
     }
     return null;
}

3.
//根据事件执行器,处理器名,及处理器,构造处理器上下文
 newCtx = newContext(group, name, handler);

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

//这个上一篇文章已说,我们把它,贴过了:
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
    private final ChannelHandler handler;
    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }
    @Override
    public ChannelHandler handler() {
        return handler;
    }
    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }
    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
}

默认的通达处理器关联一个通道处理器ChannelHandler。
//AbstractChannelHandlerContext
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
    volatile AbstractChannelHandlerContext next;//通道处理器上下文后继
    volatile AbstractChannelHandlerContext prev;//通道处理器上下文前驱
 /**
     * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
     */
    private static final int ADD_PENDING = 1;//添加状态
    /**
     * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
     */
    private static final int ADD_COMPLETE = 2;//添加完成状态
    /**
     * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
     */
    private static final int REMOVE_COMPLETE = 3;//移除完成状态
    /**
     * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
     * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
     */
    private static final int INIT = 0;//初始化状态

    private final boolean inbound;//是否为inbound处理器上下文
    private final boolean outbound;//是否为outbound处理器上下文
    private final DefaultChannelPipeline pipeline;//上下文关联管道线
    private final String name;//处理器上下文名
    private final boolean ordered;//事件执行器是否为顺序执行器

    // Will be set to null if no child executor should be used, otherwise it will be set to the
    // child executor.
    final EventExecutor executor;//事件执行器
    private ChannelFuture succeededFuture;

    // Lazily instantiated tasks used to trigger events to a handler with different executor.
    // There is no need to make this volatile as at worse it will just create a few more instances then needed.
    private Runnable invokeChannelReadCompleteTask;
    private Runnable invokeReadTask;
    private Runnable invokeChannelWritableStateChangedTask;
    private Runnable invokeFlushTask;

    private volatile int handlerState = INIT;
 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }
}

4.
//添加处理器上限文到管道上下文链头
private void addFirst0(AbstractChannelHandlerContext newCtx) {
    //获取上下文链头
    AbstractChannelHandlerContext nextCtx = head.next;
    newCtx.prev = head;
    newCtx.next = nextCtx;
    head.next = newCtx;
    nextCtx.prev = newCtx;
}

5.
// If the registered is false it means that the channel was not registered on an eventloop yet.
     // In this case we add the context to the pipeline and add a task that will call
     // ChannelHandler.handlerAdded(...) once the channel is registered.
	//如果通道没有注册到事件循环,上下文添加到管道,需要添加一个回调任务,
	//当通道注册到事件循环时,触发通道处理器的handlerAdded事件
     if (!registered) {
         newCtx.setAddPending();//更新上下文状态为正在添加
	 //创建添加通道处理器回调任务,并将任务添加管道的回调任务链中
         callHandlerCallbackLater(newCtx, true);
         return this;
 }

先来看状态更新
//AbstractChannelHandlerContext
 final void setAddPending() {
        boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
        assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
}


 //创建添加通道处理器回调任务,并将任务添加管道的回调任务链中
callHandlerCallbackLater(newCtx, true);


private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
      assert !registered;
      PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
      PendingHandlerCallback pending = pendingHandlerCallbackHead;
      if (pending == null) {
          pendingHandlerCallbackHead = task;
      } else {
          // Find the tail of the linked-list.
          while (pending.next != null) {
              pending = pending.next;
          }
          pending.next = task;
      }
}

方法很容易理解,主要是根据added参数,确定是添加还是移除任务,并创建相应的回调任务,添加到管道的回调任务列表

这一贴出上一篇的添加和移除回调任务以便理解回调任务的作用:
//PendingHandlerAddedTask 处理器添加回调任务
private final class PendingHandlerAddedTask extends PendingHandlerCallback {

    PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
        super(ctx);
    }
    @Override
    public void run() {
        callHandlerAdded0(ctx);
    }
    //在通道注册到选择器时,调用
    @Override
    void execute() {
        //获取通道上下文的事件执行器
        EventExecutor executor = ctx.executor();
        if (executor.inEventLoop()) {
	    //如果当前执行器在事务循环中直接委托为callHandlerAdded0
            callHandlerAdded0(ctx);
        } else {
            try {
	        //否则执行器,直接执行处理器添加回调任务
                executor.execute(this);
            } catch (RejectedExecutionException e) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
                            executor, ctx.name(), e);
                }
		//异常则移除通道处理器上下文
                remove0(ctx);
                ctx.setRemoved();//标志为移除
            }
        }
    }
}

6.
 //获取上下文的时间执行器
  EventExecutor executor = newCtx.executor();
	//如果事件执行器不在事件循环中
  if (!executor.inEventLoop()) {
      newCtx.setAddPending();//更新上下文状态为正在添加
      //创建一个线程,用于调用通道处理器的handlerAdded事件方法,及更新上下文状态为已添加
      executor.execute(new Runnable() {
          @Override
          public void run() {
              callHandlerAdded0(newCtx);
          }
      });
      return this;
  }


private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try {
        //触发上下文关联通道处理器的handlerAdded事件
        ctx.handler().handlerAdded(ctx);
	//更新上下文状态为添加完毕
        ctx.setAddComplete();
    } catch (Throwable t) {
        //异常发生移除通道上下文
        boolean removed = false;
        try {
	   //移除上下文
            remove0(ctx);
            try {
	        //触发通道处理器的handlerRemoved事件
                ctx.handler().handlerRemoved(ctx);
            } finally {
                ctx.setRemoved();//标记为已移除
            }
            removed = true;
        } catch (Throwable t2) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to remove a handler: " + ctx.name(), t2);
            }
        }

        if (removed) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() +
                    ".handlerAdded() has thrown an exception; removed.", t));
        } else {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() +
                    ".handlerAdded() has thrown an exception; also failed to remove.", t));
        }
    }
}


7.
 //调用通道处理器的handlerAdded事件方法,更新上下文状态为已添加
 callHandlerAdded0(newCtx);

从上面可以看出,添加通道处理器,首次检查通道处理器是否为共享模式,如果非共享,且已添加,则抛出异常;检查通道处理器名在管道内,是否存在对应通道处理器上下文,已存在抛出异常;根据事件执行器,处理器名,及处理器,构造处理器上下文;
添加处理器上限文到管道上下文链;如果通道没有注册到事件循环,上下文添加到管道时,创建添加通道处理器回调任务,并将任务添加管道的回调任务链中,当通道注册到事件循环时,触发通道处理器的handlerAdded事件,已注册则创建一个线程,
用于调用通道处理器的handlerAdded事件方法,及更新上下文状态为已添加,并交由事件执行器执行;最好调用callHandlerAdded0方法,确保调用通道处理器的handlerAdded事件方法,更新上下文状态为已添加。

再来看添加处理器器到管道尾部操作:
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
    return addLast(null, name, handler);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        newCtx = newContext(group, filterName(name, handler), handler);
        //与addFirst方法不同点,添加通道处理器上下文到管道上下文链尾
        addLast0(newCtx);
        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

//与addFirst方法不同点,添加通道处理器上下文到管道上下文链尾
 addLast0(newCtx);

private void addLast0(AbstractChannelHandlerContext newCtx) {
      //获取上下文链尾
       AbstractChannelHandlerContext prev = tail.prev;
       newCtx.prev = prev;
       newCtx.next = tail;
       prev.next = newCtx;
       tail.prev = newCtx;
}



再看添加处理器到指定处理前面
 @Override
    public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
        return addBefore(null, baseName, name, handler);
    }

    @Override
    public final ChannelPipeline addBefore(
            EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        final AbstractChannelHandlerContext ctx;
        synchronized (this) {
            checkMultiplicity(handler);
            name = filterName(name, handler);
	    //获取指定处理器的上下文
            ctx = getContextOrDie(baseName);
            newCtx = newContext(group, name, handler);
           //添加处理器到指定上下文的前面
            addBefore0(ctx, newCtx);
            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }


//获取指定处理器的上下文
  ctx = getContextOrDie(baseName);


private AbstractChannelHandlerContext getContextOrDie(String name) {
       //获取name对应的通道处理器上下文
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
        if (ctx == null) {
            throw new NoSuchElementException(name);
        } else {
            return ctx;
        }
    }

       @Override
    public final ChannelHandlerContext context(String name) {
        if (name == null) {
            throw new NullPointerException("name");
        }
        //委托给context0
        return context0(name);
    }

  
 
 private AbstractChannelHandlerContext context0(String name) {
        AbstractChannelHandlerContext context = head.next;
	//遍历管道上下文链,找到name对应的上下文
        while (context != tail) {
            if (context.name().equals(name)) {
                return context;
            }
            context = context.next;
        }
        return null;
    }



 //添加处理器到指定上下文的前面
  addBefore0(ctx, newCtx);


   private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
        newCtx.prev = ctx.prev;
        newCtx.next = ctx;
        ctx.prev.next = newCtx;
        ctx.prev = newCtx;
    }


再来看addAfter操作:
  @Override
    public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
        return addAfter(null, baseName, name, handler);
    }

    @Override
    public final ChannelPipeline addAfter(
            EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        final AbstractChannelHandlerContext ctx;

        synchronized (this) {
            checkMultiplicity(handler);
            name = filterName(name, handler);
	    //获取baseName对应的上下文
            ctx = getContextOrDie(baseName);

            newCtx = newContext(group, name, handler);
            //不同点,在这
            addAfter0(ctx, newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we remove the context from the pipeline and add a task that will call
            // ChannelHandler.handlerRemoved(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

    private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
        newCtx.prev = ctx;
        newCtx.next = ctx.next;
        ctx.next.prev = newCtx;
        ctx.next = newCtx;
    }



总结:

添加通道处理器到管道头部,首次检查通道处理器是否为共享模式,如果非共享,且已添加,则抛出异常;检查通道处理器名在管道内,是否存在对应通道处理器上下文,已存在抛出异常;根据事件执行器,处理器名,及处理器,构造处理器上下文;添加处理器上限文到管道上下文链头;如果通道没有注册到事件循环,上下文添加到管道时,创建添加通道处理器回调任务,并将任务添加管道的回调任务链中,当通道注册到事件循环时,触发通道处理器的handlerAdded事件,已注册则创建一个线程,用于调用通道处理器的handlerAdded事件方法,及更新上下文状态为已添加,并交由事件执行器执行;最好调用callHandlerAdded0方法,确保调用通道处理器的handlerAdded事件方法,更新上下文状态为已添加。其他last(添加到管道尾部),before(添加指定上下文的前面),after(添加指定上下文的后面)操作,基本上与addfirst思路基本相同,不同的是添加到管道上下文链的位置。
附:
以下方法很简单,了解一下即可
//添加多个通道处理器到管道的头部
 @Override
public final ChannelPipeline addFirst(ChannelHandler... handlers) {
    return addFirst(null, handlers);
}

@Override
public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
    if (handlers == null) {
        throw new NullPointerException("handlers");
    }
    //检查处理器数组长度
    if (handlers.length == 0 || handlers[0] == null) {
        return this;
    }

    int size;
    //检查处理器数组中的元素是否为null
    for (size = 1; size < handlers.length; size ++) {
        if (handlers[size] == null) {
            break;
        }
    }
    //遍历处理器数组,添加处理器到管道
    for (int i = size - 1; i >= 0; i --) {
        ChannelHandler h = handlers[i];
        addFirst(executor, null, h);
    }

    return this;
}
//添加多个通道处理器到管道的尾部
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}

@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    if (handlers == null) {
        throw new NullPointerException("handlers");
    }

    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, null, h);
    }

    return this;
}
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics