`
Donald_Draper
  • 浏览: 945650 次
社区版块
存档分类
最新评论

Mina Io处理器抽象实现

    博客分类:
  • Mina
阅读更多
Mina 过滤链抽象实现:http://donald-draper.iteye.com/blog/2376335
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
在上面这篇文章中,当会话发送消息后,消息被过滤链上的过滤器过滤,从链尾到链头,过程如下:
//消息发送,Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,handler并不处理会话事件)
 
public void fireFilterWrite(IoSession session, WriteRequest writeRequest) {
        Entry tail = this.tail;
        callPreviousFilterWrite(tail, session, writeRequest);
    }
private void callPreviousFilterWrite(Entry entry, IoSession session,
            WriteRequest writeRequest) {
        try {
            entry.getFilter().filterWrite(entry.getNextFilter(), session,
                    writeRequest);
        } catch (Throwable e) {
            writeRequest.getFuture().setWritten(false);
            fireExceptionCaught(session, e);
        }
    }

再来看一下过滤链头HeadFilter
//HeadFilter
private class HeadFilter extends IoFilterAdapter {
       ...
        public void filterWrite(NextFilter nextFilter, IoSession session,
                WriteRequest writeRequest) throws Exception {
            if (session.getTransportType().getEnvelopeType().isAssignableFrom(
                    writeRequest.getMessage().getClass())) {
                doWrite(session, writeRequest);
            } else {
                throw new IllegalStateException(
                        "Write requests must be transformed to "
                                + session.getTransportType().getEnvelopeType()
                                + ": " + writeRequest);
            }
        }
       ...
}

从HeadFilter的定义来看,HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;
有两个方法有所不同:
//HeadFilter
//会话写操作
public void filterWrite(NextFilter nextFilter, IoSession session,
        WriteRequest writeRequest) throws Exception {
    if (session.getTransportType().getEnvelopeType().isAssignableFrom(
            writeRequest.getMessage().getClass())) {
        doWrite(session, writeRequest);
    } else {
        throw new IllegalStateException(
                "Write requests must be transformed to "
                        + session.getTransportType().getEnvelopeType()
                        + ": " + writeRequest);
    }
}

//AbstractIoFilterChain,待子类扩展
protected abstract void doWrite(IoSession session, WriteRequest writeRequest)
            throws Exception;

再来看SocketFilterChain
class SocketFilterChain extends AbstractIoFilterChain {

    SocketFilterChain(IoSession parent) {
        super(parent);
    }

    protected void doWrite(IoSession session, WriteRequest writeRequest) {
        SocketSessionImpl s = (SocketSessionImpl) session;
	//获取Socket会话的的写请求队列,Queue继承于AbstractList,这个我们在后面再讲
        Queue writeRequestQueue = s.getWriteRequestQueue();

        // SocketIoProcessor.doFlush() will reset it after write is finished
        // because the buffer will be passed with messageSent event. 
	//这里之所以要mark buffer的位置,主要是buffer要传给messageSent事件,
	//待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置
        ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
        buffer.mark();
        int remaining = buffer.remaining();
        if (remaining == 0) {
	    //BaseIoSession
	    // private final AtomicInteger scheduledWriteRequests = new AtomicInteger();
            //更新调度请求计数器+1
            s.increaseScheduledWriteRequests();            
        } else {
	    //BaseIoSession
	    //private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
	    //更新调度写字节计数器+buffer.remaining()
            s.increaseScheduledWriteBytes(buffer.remaining());
        }

        synchronized (writeRequestQueue) {
	   //将写请求添加到session写请求队列中
            writeRequestQueue.push(writeRequest);
        }
        //如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作,这个在以后在具体详说
        if (session.getTrafficMask().isWritable()) {
            s.getIoProcessor().flush(s);
        }
    }
    //关闭会话
    protected void doClose(IoSession session) throws IOException {
        SocketSessionImpl s = (SocketSessionImpl) session;
        s.getIoProcessor().remove(s);//委托给session关联的IoProcessor
    }
}

从上面可以看出会话发送消息最后由会话IoProcessor处理,下面来看一下IoProcessor接口的定义:
/**
 * An internal interface to represent an 'I/O processor' that performs
 * actual I/O operations for {@link IoSession}s.  It abstracts existing
 * reactor frameworks such as Java NIO once again to simplify transport
 * implementations.
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 * 
 * @param <S> the type of the {@link IoSession} this processor can handle
 */
public interface IoProcessor<S extends IoSession> {

    /**
     * Releases any resources allocated by this processor.  Please note that 
     * the resources might not be released as long as there are any sessions
     * managed by this processor.  Most implementations will close all sessions
     * immediately and release the related resources.
     释放所有分配给IO处理器的资源。只要有任何会话占用资源,资源也许不会被Io处理器释放。
     在大部分的实现版本中,是立刻关闭所有会话,释放相关资源。
     */
    void dispose();
      /**
     * @return <tt>true</tt> if and if only {@link #dispose()} method has
     * been called.  Please note that this method will return <tt>true</tt>
     * even after all the related resources are released.
     如果dispose已经被调用,则返回true。即使在所有相关资源释放后,此方法仍返回true
     */
    boolean isDisposing();

    /**
     * @return <tt>true</tt> if and if only all resources of this processor
     * have been disposed.
     Io处理器所有资源释放完,则返回true。
     */
    boolean isDisposed();


    /**
     * Adds the specified {@code session} to the I/O processor so that
     * the I/O processor starts to perform any I/O operations related
     * with the {@code session}.
     * 添加会话到Io处理器,以便Io处理器启动时,执行会话相关的操作。
     * @param session The added session
     */
    void add(S session);

    /**
     * Flushes the internal write request queue of the specified
     * {@code session}.
     * 刷新会话内部的写请求队列
     * @param session The session we want the message to be written
     */
    void flush(S session);

    /**
     * Writes the WriteRequest for the specified {@code session}.
     * 发送写请求到会话
     * @param session The session we want the message to be written
     * @param writeRequest the WriteRequest to write
     */
    void write(S session, WriteRequest writeRequest);

    /**
     * Controls the traffic of the specified {@code session} depending of the
     * {@link IoSession#isReadSuspended()} and {@link IoSession#isWriteSuspended()}
     * flags
     * 依赖于会话的IoSession#isReadSuspended/isWriteSuspended标志控制session读写请求的次序
     * @param session The session to be updated
     */
    void updateTrafficControl(S session);

    /**
     * Removes and closes the specified {@code session} from the I/O
     * processor so that the I/O processor closes the connection
     * associated with the {@code session} and releases any other related
     * resources.
     * 从Io处理器移除和关闭会话,以便Io处理器关闭连接关联的会话,释放相关的资源。
     * @param session The session to be removed
     */
    void remove(S session);
}

下面来看Io处理器的抽象实现AbstractPollingIoProcessor
/**
 * An abstract implementation of {@link IoProcessor} which helps transport
 * developers to write an {@link IoProcessor} easily. This class is in charge of
 * active polling a set of {@link IoSession} and trigger events when some I/O
 * operation is possible.
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 * @param <S>
 *            the type of the {@link IoSession} this processor can handle
 */
public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
    /** A logger for this class */
    private static final Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
    /**
     * A timeout used for the select, as we need to get out to deal with idle
     * sessions.选择操作超时时间,我们需要处理空闲的会话
     */
    private static final long SELECT_TIMEOUT = 1000L;
    /** A map containing the last Thread ID for each class */每个class的最后一个线程id
    private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<>();
    /** This IoProcessor instance name处理器实例名 */
    private final String threadName;
    /** The executor to use when we need to start the inner Processor */
    private final Executor executor;//处理器内部执行器,用于运行内部处理器Processor
    /** A Session queue containing the newly created sessions */
    private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();//创建会话队列
    /** A queue used to store the sessions to be removed */
    private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();//移除会话队列
    /** A queue used to store the sessions to be flushed */
    private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>();//刷新会话队列
    /**
     * A queue used to store the sessions which have a trafficControl to be
     * updated
     */
    private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<>();//次序控制会话队列
    /** The processor thread : it handles the incoming messages 处理器线程,用于处理进来的消息*/
    private final AtomicReference<Processor> processorRef = new AtomicReference<>();
    private long lastIdleCheckTime;//上次空闲检查时间
    private final Object disposalLock = new Object();//关闭锁
    private volatile boolean disposing;//是否正在关闭
    private volatile boolean disposed;//是否已关闭
    private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);//关闭结果
    protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);//这个暂时清楚,后面遇见时再说
 }

从上面可以看出抽象Io处理器,主要几个关键内部变量为选择操作超时时间SELECT_TIMEOUT,用于腾出时间,处理空闲的会话;executor处理器内部执行器,用于运行内部处理器Processor;存储Io处理器等线程最大线程id的threadIds(Map);创建会话队列newSessions用于存储新创建的会话;移除会话队列removingSessions用于存放从处理器移除的会话;刷新会话队列flushingSessions,用于存放要发送写请求的会话;次序控制会话队列trafficControllingSessions用于存放会话暂定读写的会话;;正在处理进来消息的处理器引用processorRef;这些变量暂时这么理解,后面如果发现错误,再更正。
来看构造:
 /**
     * Create an {@link AbstractPollingIoProcessor} with the given
     * {@link Executor} for handling I/Os events.
     * 根据给定的执行器,创建抽象Io处理器用于处理IO事件
     * @param executor
     *            the {@link Executor} for handling I/O events
     */
    protected AbstractPollingIoProcessor(Executor executor) {
        if (executor == null) {
            throw new IllegalArgumentException("executor");
        }
        this.threadName = nextThreadName();//获取处理器线程名
        this.executor = executor;
    }
    /**
     * Compute the thread ID for this class instance. As we may have different
     * classes, we store the last ID number into a Map associating the class
     * name to the last assigned ID.
     * 计算类型实例最大线程id。因为我们有不同的类型,所以我们存在类型与类型实例最后一个id的映射
     关系放在Map中管理。
     * @return a name for the current thread, based on the class name and an
     *         incremental value, starting at 1.
     */
    private String nextThreadName() {
        Class<?> cls = getClass();
        int newThreadId;
	//从类型最大线程id映射Map,获取Io处理器线程的最大线程id
        AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
        if (threadId == null) {
            newThreadId = 1;
        } else {
            // Just increment the last ID, and get it.
            newThreadId = threadId.incrementAndGet();
        }
        // Now we can compute the name for this thread
        return cls.getSimpleName() + '-' + newThreadId;
    }

来看其他方法定义
/**
     * poll those sessions for the given timeout
     * 超时选择
     * @param timeout
     *            milliseconds before the call timeout if no event appear
     * @return The number of session ready for read or for write
     * @throws Exception
     *             if some low level IO error occurs
     */
    protected abstract int select(long timeout) throws Exception;

    /**
     * poll those sessions forever
     * 选择操作
     * @return The number of session ready for read or for write
     * @throws Exception
     *             if some low level IO error occurs
     */
    protected abstract int select() throws Exception;

    /**
     * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
     * is empty
     * 判断处理器的会话集合是否为空
     * @return <tt>true</tt> if at least a session is managed by this
     *         {@link IoProcessor}
     */
    protected abstract boolean isSelectorEmpty();


    /**
     * Get an {@link Iterator} for the list of {@link IoSession} polled by this
     * {@link IoProcessor}
     * 返回处理器选择的所有会话
     * @return {@link Iterator} of {@link IoSession}
     */
    protected abstract Iterator<S> allSessions();

    /**
     * Get an {@link Iterator} for the list of {@link IoSession} found selected
     * by the last call of {@link #select(long)}
     * 获取上次调用超时选择后的会话集
     * @return {@link Iterator} of {@link IoSession} read for I/Os operation
     */
    protected abstract Iterator<S> selectedSessions();

    /**
     * Get the state of a session (One of OPENING, OPEN, CLOSING)
     * 获取会话状态
     * @param session
     *            the {@link IoSession} to inspect
     * @return the state of the session
     */
    protected abstract SessionState getState(S session);

    /**
     * Tells if the session ready for writing
     * 判断会话是否可写
     * @param session
     *            the queried session
     * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
     */
    protected abstract boolean isWritable(S session);

    /**
     * Tells if the session ready for reading
     * 判断会话是否准备好读操作
     * @param session
     *            the queried session
     * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
     */
    protected abstract boolean isReadable(S session);

    /**
     * Set the session to be informed when a write event should be processed
     * 当有一个写事件要处理时,是否通知会话
     * @param session
     *            the session for which we want to be interested in write events
     * @param isInterested
     *            <tt>true</tt> for registering, <tt>false</tt> for removing
     * @throws Exception
     *             If there was a problem while registering the session
     */
    protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;

    /**
     * Set the session to be informed when a read event should be processed
     * 当有一个读事件要处理时,是否通知会话
     * @param session
     *            the session for which we want to be interested in read events
     * @param isInterested
     *            <tt>true</tt> for registering, <tt>false</tt> for removing
     * @throws Exception
     *             If there was a problem while registering the session
     */
    protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;

    /**
     * Tells if this session is registered for reading
     * 判断会话是否注册读事件
     * @param session
     *            the queried session
     * @return <tt>true</tt> is registered for reading
     */
    protected abstract boolean isInterestedInRead(S session);

    /**
     * Tells if this session is registered for writing
     * 判断会话是否注册写事件
     * @param session
     *            the queried session
     * @return <tt>true</tt> is registered for writing
     */
    protected abstract boolean isInterestedInWrite(S session);

    /**
     * Initialize the polling of a session. Add it to the polling process.
     * 初始化会话,添加到处理器
     * @param session
     *            the {@link IoSession} to add to the polling
     * @throws Exception
     *             any exception thrown by the underlying system calls
     */
    protected abstract void init(S session) throws Exception;

    /**
     * Destroy the underlying client socket handle
     * 关闭底层客户端socket
     * @param session
     *            the {@link IoSession}
     * @throws Exception
     *             any exception thrown by the underlying system calls
     */
    protected abstract void destroy(S session) throws Exception;

    /**
     * Reads a sequence of bytes from a {@link IoSession} into the given
     * {@link IoBuffer}. Is called when the session was found ready for reading.
     * 当会话准备好读操作是,从会话读字节序列
     * @param session
     *            the session to read
     * @param buf
     *            the buffer to fill
     * @return the number of bytes read
     * @throws Exception
     *             any exception thrown by the underlying system calls
     */
    protected abstract int read(S session, IoBuffer buf) throws Exception;

    /**
     * Write a sequence of bytes to a {@link IoSession}, means to be called when
     * a session was found ready for writing.
     * 当会话准备好写操作是,写字节序列到会话
     * @param session
     *            the session to write
     * @param buf
     *            the buffer to write
     * @param length
     *            the number of bytes to write can be superior to the number of
     *            bytes remaining in the buffer
     * @return the number of byte written
     * @throws IOException
     *             any exception thrown by the underlying system calls
     */
    protected abstract int write(S session, IoBuffer buf, int length) throws IOException;

    /**
     * Write a part of a file to a {@link IoSession}, if the underlying API
     * isn't supporting system calls like sendfile(), you can throw a
     * {@link UnsupportedOperationException} so the file will be send using
     * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
     * 写文件的某个Region到会话,如果底层API不支持sendfile方法,你可以抛出一个UnsupportedOperationException,
     那么将调用#write(AbstractIoSession, IoBuffer, int)发送文件。
     * @param session
     *            the session to write
     * @param region
     *            the file region to write
     * @param length
     *            the length of the portion to send
     * @return the number of written bytes
     * @throws Exception
     *             any exception thrown by the underlying system calls
     */
    protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;

来看添加会话到处理器
  
 /**
     * {@inheritDoc}
     */
   //会话创建时,添加回到到处理器,
    @Override
    public final void add(S session) {
        if (disposed || disposing) {//如果处理器已关闭,则抛出非法状态异常
            throw new IllegalStateException("Already disposed.");
        }
        // Adds the session to the newSession queue and starts the worker
	//添加会话到Io处理器的创建会话队列中
        newSessions.add(session);
         //启动一个Io处理器线程
        startupProcessor();
    }

    
 /**
     * Starts the inner Processor, asking the executor to pick a thread in its
     * pool. The Runnable will be renamed
     */
    private void startupProcessor() {
        //从处理器引用获取处理器
        Processor processor = processorRef.get();
        if (processor == null) {
	   //处理器为空,则创建一个
            processor = new Processor();
            if (processorRef.compareAndSet(null, processor)) {
	        //执行处理器
                executor.execute(new NamePreservingRunnable(processor, threadName));
            }
        }
        // Just stop the select() and start it again, so that the processor
        // can be activated immediately.
	//暂时停止选择操作,待处理器线程启动
        wakeup();
    }

/**
     * Interrupt the {@link #select(long)} call.
     中断选择操作
     */
    protected abstract void wakeup();

这个过程有两点要看
1.
 //处理器为空,则创建一个
 processor = new Processor();

2.
//执行处理器                                                         
executor.execute(new NamePreservingRunnable(processor, threadName));  


先来看处理器线程
1.
 //处理器为空,则创建一个
 processor = new Processor();

//Processor
  /**
  * The main loop. This is the place in charge to poll the Selector, and to
  * process the active sessions. It's done in - handle the newly created
  * sessions -
  */
 private class Processor implements Runnable {
     /**
      * {@inheritDoc}
      */
     @Override
     public void run() {
         //断言Io处理器实际处理线程是否为当前Processor
         assert processorRef.get() == this;
         int nSessions = 0;
         lastIdleCheckTime = System.currentTimeMillis();
         int nbTries = 10;
         for (;;) {
             try {
                 // This select has a timeout so that we can manage
                 // idle session when we get out of the select every
                 // second. (note : this is a hack to avoid creating
                 // a dedicated thread).
		 //选择操作有一个超时时间,以便当选择超时时,处理空闲会话
                 long t0 = System.currentTimeMillis();
		 //超时选择,select方法待子类实现
                 int selected = select(SELECT_TIMEOUT);
                 long t1 = System.currentTimeMillis();
                 long delta = t1 - t0;
                 //当前这次选择操作,没有SELECTKey相关事件,没有中断,且此次选择操作耗时
		 //小于SELECT_TIMEOUT(1000)/nbTries(10)
                 if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
                     // Last chance : the select() may have been
                     // interrupted because we have had an closed channel.
		     //在上次尝试选择操作时,可能通道关闭,选择操作可能被中断
                     if (isBrokenConnection()) {
		         //通道关闭的话,仅仅输出日志
                         LOG.warn("Broken connection");
                     } else {
                         // Ok, we are hit by the nasty epoll
                         // spinning.
                         // Basically, there is a race condition
                         // which causes a closing file descriptor not to be
                         // considered as available as a selected channel,
                         // but
                         // it stopped the select. The next time we will
                         // call select(), it will exit immediately for the
                         // same
                         // reason, and do so forever, consuming 100%
                         // CPU.
                         // We have to destroy the selector, and
                         // register all the socket on a new one.
                         if (nbTries == 0) {
			     //如果尝试次数用完我们,注册新的选择器
                             LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
                             registerNewSelector();
                             nbTries = 10;//恢复尝试次数
                         } else {
			     //否则尝试次数自减
                             nbTries--;
                         }
                     }
                 } else {
                     nbTries = 10;
                 }

                 // Manage newly created session first
		 //处理新会话
                 nSessions += handleNewSessions();
                 //更新会话状态
                 updateTrafficMask();

                 // Now, if we have had some incoming or outgoing events,
                 // deal with them
                 if (selected > 0) {
                     // LOG.debug("Processing ..."); // This log hurts one of
                     // the MDCFilter test...
		     //如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件
		     //读写事件,委托为process方法处理
                     process();
                 }

                 // Write the pending requests
		 //处理有些请求的会话
                 long currentTime = System.currentTimeMillis();
                 flush(currentTime);

                 // And manage removed sessions
		 //移除已关闭的会话
                 nSessions -= removeSessions();

                 // Last, not least, send Idle events to the idle sessions
		 //通知会话空闲事件
                 notifyIdleSessions(currentTime);

                 // Get a chance to exit the infinite loop if there are no
                 // more sessions on this Processor
		 //如果在这个过程中,激活会话最后为0,则清除处理器引用
                 if (nSessions == 0) {
                     processorRef.set(null);

                     if (newSessions.isEmpty() && isSelectorEmpty()) {
                         // newSessions.add() precedes startupProcessor
                         assert processorRef.get() != this;
                         break;
                     }

                     assert processorRef.get() != this;

                     if (!processorRef.compareAndSet(null, this)) {
                         // startupProcessor won race, so must exit processor
                         assert processorRef.get() != this;
                         break;
                     }

                     assert processorRef.get() == this;
                 }

                 // Disconnect all sessions immediately if disposal has been
                 // requested so that we exit this loop eventually.
		 //判断Io处理器是否正在关闭,如果正在关闭断开所有会话
                 if (isDisposing()) {
                     boolean hasKeys = false;
                     //获取当前处理器管理的会话,移除会话
                     for (Iterator<S> i = allSessions(); i.hasNext();) {
                         IoSession session = i.next();

                         if (session.isActive()) {
                             scheduleRemove((S) session);
                             hasKeys = true;
                         }
                     }

                     if (hasKeys) {
                         wakeup();
                     }
                 }
             } catch (ClosedSelectorException cse) {
                 // If the selector has been closed, we can exit the loop
                 // But first, dump a stack trace
                 ExceptionMonitor.getInstance().exceptionCaught(cse);
                 break;
             } catch (Exception e) {
                 ExceptionMonitor.getInstance().exceptionCaught(e);

                 try {
                     Thread.sleep(1000);
                 } catch (InterruptedException e1) {
                     ExceptionMonitor.getInstance().exceptionCaught(e1);
                 }
             }
         }
         try {
             synchronized (disposalLock) {
                 if (disposing) {
		     //如果正在关闭则,完成实际关闭工作
                     doDispose();
                 }
             }
         } catch (Exception e) {
	     //捕捉异常
             ExceptionMonitor.getInstance().exceptionCaught(e);
         } finally {
	     //已关闭
             disposalFuture.setValue(true);
         }
}

Io处理器,处理线程Processor的实际工作有一下几点要看
a.
if (isBrokenConnection()) {
       //通道关闭的话,仅仅输出日志
       LOG.warn("Broken connection");
   } 

b.
else {
    // Ok, we are hit by the nasty epoll
    // spinning.
    // Basically, there is a race condition
    // which causes a closing file descriptor not to be
    // considered as available as a selected channel,
    // but
    // it stopped the select. The next time we will
    // call select(), it will exit immediately for the
    // same
    // reason, and do so forever, consuming 100%
    // CPU.
    // We have to destroy the selector, and
    // register all the socket on a new one.
    if (nbTries == 0) {
     //如果尝试次数用完我们,注册新的选择器
        LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
        registerNewSelector();
        nbTries = 10;//恢复尝试次数
    } else {
     //否则尝试次数自减
        nbTries--;
    }
}

c.
 // Manage newly created session first
 //处理新会话
 nSessions += handleNewSessions();

d.
//更新会话状态
updateTrafficMask();

e.
// Now, if we have had some incoming or outgoing events,
// deal with them
if (selected > 0) {
    // LOG.debug("Processing ..."); // This log hurts one of
    // the MDCFilter test...
    //如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件
    //读写事件,委托为process方法处理
    process();
}

f.
 // Write the pending requests
 //处理有些请求的会话
 long currentTime = System.currentTimeMillis();
 flush(currentTime);

g.
 // And manage removed sessions
//移除已关闭的会话
nSessions -= removeSessions();

h.
// Last, not least, send Idle events to the idle sessions
//通知会话空闲事件
notifyIdleSessions(currentTime);

i.
// Disconnect all sessions immediately if disposal has been
// requested so that we exit this loop eventually.
//判断Io处理器是否正在关闭,如果正在关闭断开所有会话
if (isDisposing()) {
    boolean hasKeys = false;
    //获取当前处理器管理的会话,移除会话
    for (Iterator<S> i = allSessions(); i.hasNext();) {
        IoSession session = i.next();

        if (session.isActive()) {
            scheduleRemove((S) session);
            hasKeys = true;
        }
    }
    if (hasKeys) {
        wakeup();
    }
}

j.
t
ry {
       synchronized (disposalLock) {
           if (disposing) {
   	     //如果正在关闭,完成实际关闭工作
               doDispose();
           }
       }
   } catch (Exception e) {
       //捕捉异常
       ExceptionMonitor.getInstance().exceptionCaught(e);
   } finally {
	     //已关闭
             disposalFuture.setValue(true);
}

下面我们分别来看这几点
a.
 if (isBrokenConnection()) {
       //通道关闭的话,仅仅输出日志
       LOG.warn("Broken connection");
   } 

  /**
     * Check that the select() has not exited immediately just because of a
     * broken connection. In this case, this is a standard case, and we just
     * have to loop.
     * 检查选择是否由于Io处理器连接断开,选择操作还没有退出
     * @return <tt>true</tt> if a connection has been brutally closed.
     * @throws IOException
     *             If we got an exception
     */
    protected abstract boolean isBrokenConnection() throws IOException;


b.
else {
    // Ok, we are hit by the nasty epoll
    // spinning.
    // Basically, there is a race condition
    // which causes a closing file descriptor not to be
    // considered as available as a selected channel,
    // but
    // it stopped the select. The next time we will
    // call select(), it will exit immediately for the
    // same
    // reason, and do so forever, consuming 100%
    // CPU.
    // We have to destroy the selector, and
    // register all the socket on a new one.
    if (nbTries == 0) {
     //如果尝试次数用完我们,注册新的选择器
        LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
        registerNewSelector();
        nbTries = 10;//恢复尝试次数
    } else {
     //否则尝试次数自减
        nbTries--;
    }
}

 /**
     * In the case we are using the java select() method, this method is used to
     * trash the buggy selector and create a new one, registring all the sockets
     * on it.
     * 丢弃旧的选择器,将所有socket注册到新的选择器上
     * @throws IOException
     *             If we got an exception
     */
    protected abstract void registerNewSelector() throws IOException;

c.
// Manage newly created session first
 //处理新会话
 nSessions += handleNewSessions();

 /**
    * Loops over the new sessions blocking queue and returns the number of
    * sessions which are effectively created
    * 遍历创建会话队列,返回新建会话的数量
    * @return The number of new sessions
    */
   private int handleNewSessions() {
       int addedSessions = 0;
       for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
           if (addNow(session)) {
               // A new session has been created
               addedSessions++;
           }
       }
       return addedSessions;
   }


/**
  * Process a new session : - initialize it - create its chain - fire the
  * CREATED listeners if any
  * 处理新会话,初始化会话,创建会话过滤链,触发监听器会话创建事件
  * @param session
  *            The session to create
  * @return <tt>true</tt> if the session has been registered
  */
 private boolean addNow(S session) {
     boolean registered = false;

     try {
        //初始化会话
         init(session);
         registered = true;
         // Build the filter chain of this session.
	 //获取会话service过滤链构建器
         IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
	 //构建会话过滤链
         chainBuilder.buildFilterChain(session.getFilterChain());
         // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
         // in AbstractIoFilterChain.fireSessionOpened().
         // Propagate the SESSION_CREATED event up to the chain
         IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
	 //触发会话事件
         listeners.fireSessionCreated(session);
     } catch (Exception e) {
         ExceptionMonitor.getInstance().exceptionCaught(e);

         try {
             destroy(session);
         } catch (Exception e1) {
             ExceptionMonitor.getInstance().exceptionCaught(e1);
         } finally {
             registered = false;
         }
     }
     return registered;
 }

添加会话有一下几点要看
c.1
//初始化会话
init(session);

protected abstract void init(AbstractIoSession abstractiosession)
        throws Exception;

c.2
//触发会话事件
listeners.fireSessionCreated(session);

//IoServiceListenerSupport
public void fireSessionCreated(IoSession session)
    {
        boolean firstSession = false;
        if(session.getService() instanceof IoConnector)
            synchronized(managedSessions)
            {
                firstSession = managedSessions.isEmpty();
            }
        if(managedSessions.putIfAbsent(Long.valueOf(session.getId()), session) != null)
            return;
        if(firstSession)
            fireServiceActivated();
	//触发会话过滤链会话创建和会话打开事件
        IoFilterChain filterChain = session.getFilterChain();
        filterChain.fireSessionCreated();
        filterChain.fireSessionOpened();
        ...
}

c.3
 destroy(session);

/**
     * Destroy the underlying client socket handle
     * 关闭底层客户端socket
     * @param session
     *            the {@link IoSession}
     * @throws Exception
     *             any exception thrown by the underlying system calls
     */
    protected abstract void destroy(S session) throws Exception;

d.
//更新会话状态
updateTrafficMask()
;


 /**
  * Update the trafficControl for all the session.
  */
 private void updateTrafficMask() {
     int queueSize = trafficControllingSessions.size();
     while (queueSize > 0) {
         S session = trafficControllingSessions.poll();
         if (session == null) {
             // We are done with this queue.
             return;
         }
	 //获取会话状态
         SessionState state = getState(session);
         switch (state) {
         case OPENED:
	     //更新会话状态
             updateTrafficControl(session);
             break;
         case CLOSING:
             break;
         case OPENING:
             // Retry later if session is not yet fully initialized.
             // (In case that Session.suspend??() or session.resume??() is
             // called before addSession() is processed)
             // We just put back the session at the end of the queue.
	     //如果正在打开,则添加到次序控制会话队列
             trafficControllingSessions.add(session);
             break;
         default:
             throw new IllegalStateException(String.valueOf(state));
         }

         // As we have handled one session, decrement the number of
         // remaining sessions. The OPENING session will be processed
         // with the next select(), as the queue size has been decreased,
         // even
         // if the session has been pushed at the end of the queue
         queueSize--;
     }
 }

/**
 * {@inheritDoc}
 */
@Override
public void updateTrafficControl(S session) {
    //
    try {
        //通知读操作事件
        setInterestedInRead(session, !session.isReadSuspended());
    } catch (Exception e) {
        IoFilterChain filterChain = session.getFilterChain();
        filterChain.fireExceptionCaught(e);
    }
    try {
       //通知写操作事件
        setInterestedInWrite(session,
                !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
    } catch (Exception e) {
        IoFilterChain filterChain = session.getFilterChain();
        filterChain.fireExceptionCaught(e);
    }
}

 /**
     * Set the session to be informed when a write event should be processed
     * 当有一个写事件要处理时,是否通知会话
     * @param session
     *            the session for which we want to be interested in write events
     * @param isInterested
     *            <tt>true</tt> for registering, <tt>false</tt> for removing
     * @throws Exception
     *             If there was a problem while registering the session
     */
    protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;


 
  /**
     * Set the session to be informed when a read event should be processed
     * 当有一个读事件要处理时,是否通知会话
     * @param session
     *            the session for which we want to be interested in read events
     * @param isInterested
     *            <tt>true</tt> for registering, <tt>false</tt> for removing
     * @throws Exception
     *             If there was a problem while registering the session
     */
    protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;

e.
// Now, if we have had some incoming or outgoing events,
// deal with them
if (selected > 0) {
    // LOG.debug("Processing ..."); // This log hurts one of
    // the MDCFilter test...
    //如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件
    //读写事件,委托为process方法处理
    process();
}

 private void process() throws Exception {
     for (Iterator<S> i = selectedSessions(); i.hasNext();) {
         S session = i.next();
	 //处理会话
         process(session);
         i.remove();
     }
 }

/**
     * Get an {@link Iterator} for the list of {@link IoSession} found selected
     * by the last call of {@link #select(long)}
     * 获取上次调用超时选择后,准备就绪会话集
     * @return {@link Iterator} of {@link IoSession} read for I/Os operation
     */
    protected abstract Iterator<S> selectedSessions();


/**
     * Deal with session ready for the read or write operations, or both.
     */
    private void process(S session) {
        // Process Reads
        if (isReadable(session) && !session.isReadSuspended()) {
	    //如果会话可读,则读会话接收到的数据
            read(session);
        }

        // Process writes
        if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
            // add the session to the queue, if it's not already there
	    //如果会话有数据要发送,则将会话添加到刷新会话队列
            flushingSessions.add(session);
        }
    }
}
}

处理会话有两点要关注,
e.1
// Process Reads
  if (isReadable(session) && !session.isReadSuspended()) {
    //如果会话可读,则读会话接收到的数据
      read(session);
  }

private void read(S session) {
        //获取会话配置,会话配置读缓存size
        IoSessionConfig config = session.getConfig();
        int bufferSize = config.getReadBufferSize();
        IoBuffer buf = IoBuffer.allocate(bufferSize);
        final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
        try {
            int readBytes = 0;
            int ret;

            try {
                if (hasFragmentation) {
                    //从会话读取字节序列到buffer
                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;

                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);

                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }

            if (readBytes > 0) {
	       //获取会话过滤链,触发过滤链消息接收事件MessageReceive
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireMessageReceived(buf);
                buf = null;

                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            } else {
                // release temporary buffer when read nothing
                buf.free(); 
            }
            //如果会话socket关闭,则触发过滤链fireInputClosed
            if (ret < 0) {
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireInputClosed();
            }
        } catch (Exception e) {
            if ((e instanceof IOException) &&
                (!(e instanceof PortUnreachableException)
                        || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
                        || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable())) {
                scheduleRemove(session);
            }
	    //触发过滤链异常事件ExceptionCaught
            IoFilterChain filterChain = session.getFilterChain();
            filterChain.fireExceptionCaught(e);
        }
    }

e.2
// Process writes                                                                                   
if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {     
    // add the session to the queue, if it's not already there                                      
    //如果会话有数据要发送,则将会话添加到刷新会话队列                                              
    flushingSessions.add(session);  
}   

f.
 // Write the pending requests
 //处理有写请求的会话
 long currentTime = System.currentTimeMillis();
 flush(currentTime);


 
/**
  * Write all the pending messages
  */
 private void flush(long currentTime) {
     if (flushingSessions.isEmpty()) {
         return;
     }
     //遍历刷新会话队列
     do {
         S session = flushingSessions.poll(); // the same one with
                                              // firstSession
         if (session == null) {
             // Just in case ... It should not happen.
             break;
         }
         // Reset the Schedule for flush flag for this session,
         // as we are flushing it now
	 //设置会话刷新状态为未刷新
         session.unscheduledForFlush();
	 //获取会话状态
         SessionState state = getState(session);
         switch (state) {
         case OPENED:
             try {
	         //会话已已打开,则委托给flushNow
                 boolean flushedAll = flushNow(session, currentTime);
                 
                 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
                         && !session.isScheduledForFlush()) {
		     //调度刷新会话
                     scheduleFlush(session);
                 }
             } catch (Exception e) {
                 scheduleRemove(session);//移除会话调度
                 session.closeNow();//异常立刻关闭会话
                 IoFilterChain filterChain = session.getFilterChain();
                 filterChain.fireExceptionCaught(e);
             }

             break;

         case CLOSING:
             // Skip if the channel is already closed.
             break;

         case OPENING:
             // Retry later if session is not yet fully initialized.
             // (In case that Session.write() is called before addSession()
             // is processed)
	     //如果正在会话正在打开,则调度刷新会话
             scheduleFlush(session);
             return;

         default:
             throw new IllegalStateException(String.valueOf(state));
         }

     } while (!flushingSessions.isEmpty());
 }

方法有以下几点要关注
f.1
 //会话已已打开,则委托给flushNow                                   
 boolean flushedAll = flushNow(session, currentTime); 



private boolean flushNow(S session, long currentTime) {
    //如果会话失去连接,则添加会话到移除会话队列
    if (!session.isConnected()) {
        scheduleRemove(session);
        return false;
    }
    final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
   //获取会话写请求队列
    final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
    // Set limitation for the number of written bytes for read-write
    // fairness. I used maxReadBufferSize * 3 / 2, which yields best
    // performance in my experience while not breaking fairness much.
    //写buffer最大size,经验值为maxReadBufferSize * 3 / 2
    final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
            + (session.getConfig().getMaxReadBufferSize() >>> 1);
    int writtenBytes = 0;
    WriteRequest req = null;
    try {
        // Clear OP_WRITE,清除会话写事件OP_WRITE标志
        setInterestedInWrite(session, false);

        do {
            // Check for pending writes.
	    //获取会话当前写情趣
            req = session.getCurrentWriteRequest();
            if (req == null) {
                req = writeRequestQueue.poll(session);

                if (req == null) {
                    break;
                }

                session.setCurrentWriteRequest(req);
            }
            int localWrittenBytes;
	    //获取写请求消息
            Object message = req.getMessage();
            if (message instanceof IoBuffer) {
	        //写会话buffer
                localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
                        currentTime);
                if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
                    // the buffer isn't empty, we re-interest it in writing
                    setInterestedInWrite(session, true);

                    return false;
                }
            } else if (message instanceof FileRegion) {
	        写文件
                localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
                        currentTime);

                // Fix for Java bug on Linux
                // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
                // If there's still data to be written in the FileRegion,
                // return 0 indicating that we need
                // to pause until writing may resume.
                if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
                    setInterestedInWrite(session, true);
                    return false;
                }
            } else {
                throw new IllegalStateException("Don't know how to handle message of type '"
                        + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
            }
            if (localWrittenBytes == 0) {

                // Kernel buffer is full.
                if (!req.equals(AbstractIoSession.MESSAGE_SENT_REQUEST)) {
                    setInterestedInWrite(session, true);
                    return false;
                }
            } else {
                writtenBytes += localWrittenBytes;

                if (writtenBytes >= maxWrittenBytes) {
                    // Wrote too much
                    scheduleFlush(session);
                    return false;
                }
            }

            if (message instanceof IoBuffer) {
                ((IoBuffer) message).free();
            }
        } while (writtenBytes < maxWrittenBytes);
    } catch (Exception e) {
       //写请求结果异常
        if (req != null) {
            req.getFuture().setException(e);
        }
        IoFilterChain filterChain = session.getFilterChain();
        filterChain.fireExceptionCaught(e);
        return false;
    }

    return true;
}


这一点有一下几点要看
f.1.1
 //写会话buffer
localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
        currentTime);

  private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
                throws Exception {
            IoBuffer buf = (IoBuffer) req.getMessage();
            int localWrittenBytes = 0;
            if (buf.hasRemaining()) {
                int length;

                if (hasFragmentation) {
                    length = Math.min(buf.remaining(), maxLength);
                } else {
                    length = buf.remaining();
                }
                try {
		    //发送会话数据
                    localWrittenBytes = write(session, buf, length);
                } catch (IOException ioe) {
                    // We have had an issue while trying to send data to the
                    // peer : let's close the session.
                    buf.free();
                    session.closeNow();
                    removeNow(session);

                    return 0;
                }
            }
            session.increaseWrittenBytes(localWrittenBytes, currentTime);
            // Now, forward the original message
            if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
                // Buffer has been sent, clear the current request.
                Object originalMessage = req.getOriginalRequest().getMessage();

                if (originalMessage instanceof IoBuffer) {
                    buf = (IoBuffer) req.getOriginalRequest().getMessage();

                    int pos = buf.position();
                    buf.reset();
                    fireMessageSent(session, req);
                    // And set it back to its position
                    buf.position(pos);
                } else {
                    fireMessageSent(session, req);
                }
            }

            return localWrittenBytes;
}

 /**
     * Write a sequence of bytes to a {@link IoSession}, means to be called when
     * a session was found ready for writing.
     * 当会话准备好写操作是,写字节序列到会话
     * @param session
     *            the session to write
     * @param buf
     *            the buffer to write
     * @param length
     *            the number of bytes to write can be superior to the number of
     *            bytes remaining in the buffer
     * @return the number of byte written
     * @throws IOException
     *             any exception thrown by the underlying system calls
     */
    protected abstract int write(S session, IoBuffer buf, int length) throws IOException;


f.1.2
//写文件                                                                                        
localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,  
        currentTime); 
   

//写文件
private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
        throws Exception {
    int localWrittenBytes;
    //获取写请求文件FileRegion
    FileRegion region = (FileRegion) req.getMessage();
    if (region.getRemainingBytes() > 0) {
        int length;

        if (hasFragmentation) {
            length = (int) Math.min(region.getRemainingBytes(), maxLength);
        } else {
            length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
        }
        //委托给transferFile
        localWrittenBytes = transferFile(session, region, length);
        region.update(localWrittenBytes);
    } else {
        localWrittenBytes = 0;
    }
    session.increaseWrittenBytes(localWrittenBytes, currentTime);
    if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
       //触发会话消息发送事件
        fireMessageSent(session, req);
    }
    return localWrittenBytes;
}

  
 /**
     * Write a part of a file to a {@link IoSession}, if the underlying API
     * isn't supporting system calls like sendfile(), you can throw a
     * {@link UnsupportedOperationException} so the file will be send using
     * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
     * 写文件的某个Region到会话,如果底层API不支持sendfile方法,你可以抛出一个UnsupportedOperationException,
     那么将调用#write(AbstractIoSession, IoBuffer, int)发送文件。
     * @param session
     *            the session to write
     * @param region
     *            the file region to write
     * @param length
     *            the length of the portion to send
     * @return the number of written bytes
     * @throws Exception
     *             any exception thrown by the underlying system calls
     */
    protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;

//触发会话消息发送事件
 private void fireMessageSent(AbstractIoSession session, WriteRequest req)
    {
        session.setCurrentWriteRequest(null);
        IoFilterChain filterChain = session.getFilterChain();
        filterChain.fireMessageSent(req);
    }

f.2   
//如果刷新完成,且会话写请求队列不为空,会话待调度
 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) 
         && !session.isScheduledForFlush()) {                       
     //调度刷新会话                                                 
     scheduleFlush(session);        
 }  

 //调度写请求会话,及添加到刷新队列
 private void scheduleFlush(S session) {
            // add the session to the queue if it's not already
            // in the queue
            if (session.setScheduledForFlush(true)) {
                flushingSessions.add(session);
            }
 }

f.3
 scheduleRemove(session);//移除会话调度
session.closeNow();//异常立刻关闭会话
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e); 

//添加回到到移除队列
private void scheduleRemove(S session) {
    if (!removingSessions.contains(session)) {
        removingSessions.add(session);
    }
}

g.
 // And manage removed sessions
//移除已关闭的会话
nSessions -= removeSessions();

//移除会话
private int removeSessions() {
    int removedSessions = 0;
    //遍历移除会话队列,如果poll的会话不为空,则获取会话状态,
    for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
        SessionState state = getState(session);

        // Now deal with the removal accordingly to the session's state
        switch (state) {
        case OPENED:
            // Try to remove this session
	    //尝试移除会话
            if (removeNow(session)) {
                removedSessions++;
            }
            break;
        case CLOSING:
            // Skip if channel is already closed
            // In any case, remove the session from the queue
	    //会话关闭,则更新会话移除计数器
            removedSessions++;
            break;
        case OPENING:
            // Remove session from the newSessions queue and
            // remove it
	    //正在打开从新创建会话对垒移除会话
            newSessions.remove(session);
            if (removeNow(session)) {
                removedSessions++;
            }
            break;
        default:
            throw new IllegalStateException(String.valueOf(state));
        }
    }
    return removedSessions;
}

//尝试移除会话
private boolean removeNow(S session) {
   //清除会话写请求队列
    clearWriteRequestQueue(session);
    try {
        //销毁会话
        destroy(session);
        return true;
    } catch (Exception e) {
        IoFilterChain filterChain = session.getFilterChain();
        filterChain.fireExceptionCaught(e);
    } finally {
        try {
            clearWriteRequestQueue(session);
            ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
        } catch (Exception e) {
            // The session was either destroyed or not at this point.
            // We do not want any exception thrown from this "cleanup" code
            // to change
            // the return value by bubbling up.
            IoFilterChain filterChain = session.getFilterChain();
            filterChain.fireExceptionCaught(e);
        }
    }
    return false;
}

//清除会话写请求队列
 private void clearWriteRequestQueue(S session) {
          WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
          WriteRequest req;
          List<WriteRequest> failedRequests = new ArrayList<>();
          if ((req = writeRequestQueue.poll(session)) != null) {
              Object message = req.getMessage();
              if (message instanceof IoBuffer) {
                  IoBuffer buf = (IoBuffer) message;

                  // The first unwritten empty buffer must be
                  // forwarded to the filter chain.
		  //如果会话写请求buffer还有数据,添加写请求到失败写请求集合
                  if (buf.hasRemaining()) {
                      buf.reset();
                      failedRequests.add(req);
                  } else {
                      IoFilterChain filterChain = session.getFilterChain();
		      //触发会话过滤链消息发送事件fireMessageSent
                      filterChain.fireMessageSent(req);
                  }
              } else {
                  failedRequests.add(req);
              }
              // Discard others.丢弃其余的会话写请求
              while ((req = writeRequestQueue.poll(session)) != null) {
                  failedRequests.add(req);
              }
          }
          // Create an exception and notify.
          if (!failedRequests.isEmpty()) {
              WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
	      //更新会话调度字节计数器
              for (WriteRequest r : failedRequests) {
                  session.decreaseScheduledBytesAndMessages(r);
                  r.getFuture().setException(cause);
              }
              IoFilterChain filterChain = session.getFilterChain();
              filterChain.fireExceptionCaught(cause);
          }
      }

 
/**
     * Destroy the underlying client socket handle
     * 关闭底层客户端socket
     * @param session
     *            the {@link IoSession}
     * @throws Exception
     *             any exception thrown by the underlying system calls
     */
    protected abstract void destroy(S session) throws Exception;

h.
// Last, not least, send Idle events to the idle sessions
//通知会话空闲事件
notifyIdleSessions(currentTime);


private void notifyIdleSessions(long currentTime) throws Exception {
           // process idle sessions
           if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
               lastIdleCheckTime = currentTime;
	       //通知会话空闲,
               AbstractIoSession.notifyIdleness(allSessions(), currentTime);
           }
       }

//AbstractIoSession
//遍历会话集,通知会话空闲
 public static void notifyIdleness(Iterator sessions, long currentTime)
    {
        do
        {
            if(!sessions.hasNext())
                break;
            IoSession session = (IoSession)sessions.next();
            if(!session.getCloseFuture().isClosed())
                notifyIdleSession(session, currentTime);
        } while(true);
    }


public static void notifyIdleSession(IoSession session, long currentTime)
    {
       //通知会话读写空闲
        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE), IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE), IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
        notifyWriteTimeout(session, currentTime);
    }
    //触发会话空闲状态
    private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status, long lastIoTime)
    {
        if(idleTime > 0L && lastIoTime != 0L && currentTime - lastIoTime >= idleTime)
            session.getFilterChain().fireSessionIdle(status);
    }
    //通知会话超时
    private static void notifyWriteTimeout(IoSession session, long currentTime)
    {
        long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
        if(writeTimeout > 0L && currentTime - session.getLastWriteTime() >= writeTimeout && !session.getWriteRequestQueue().isEmpty(session))
        {
            WriteRequest request = session.getCurrentWriteRequest();
            if(request != null)
            {
	       //设置会话写请求超时异常
                session.setCurrentWriteRequest(null);
                WriteTimeoutException cause = new WriteTimeoutException(request);
                request.getFuture().setException(cause);
                session.getFilterChain().fireExceptionCaught(cause);
                session.closeNow();
            }
        }
    }


i.
 // Disconnect all sessions immediately if disposal has been
// requested so that we exit this loop eventually.
//判断Io处理器是否正在关闭,如果正在关闭断开所有会话
if (isDisposing()) {
    boolean hasKeys = false;
    //获取当前处理器管理的会话,移除会话
    for (Iterator<S> i = allSessions(); i.hasNext();) {
        IoSession session = i.next();

        if (session.isActive()) {
            scheduleRemove((S) session);
            hasKeys = true;
        }
    }
    if (hasKeys) {
        wakeup();
    }
}

j.
t
ry {
       synchronized (disposalLock) {
           if (disposing) {
   	     //如果正在关闭,完成实际关闭工作
               doDispose();
           }
       }
   } catch (Exception e) {
       //捕捉异常
       ExceptionMonitor.getInstance().exceptionCaught(e);
   } finally {
	     //已关闭
             disposalFuture.setValue(true);
}

/**
 * Dispose the resources used by this {@link IoProcessor} for polling the
 * client connections. The implementing class doDispose method will be
 * called.
 * 释放IO处理器相关的资源
 * @throws Exception
 *             if some low level IO error occurs
 */
protected abstract void doDispose() throws Exception;


从上面来看处理器的实际工作,尝试10次nbTries选择操作,在每次选择操作过程中,
首先进行超时选择操作,然后检查Io处理器是否断开连接,尝试次数nbTries是否为零如果为0,则注册新的选择器;然后遍历创建会话队列,从队列拉取会话,如果会话为不null,则初始化会话,构建会话过滤链(从IoService继承)触发会话过滤链的会话创建和会话打开事件,并记录新创建的会话数量nSessions;更会会话状态,此过程为从会话次序控制队列
获取会话,检查会话状态,如果状态为OPENED更新会话的读写状态,如果为OPENING放回次序控制会话队列;如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件(读写事件),遍历选择后读写等操作就绪的会话,如果会话可读,则读取会话缓存区数据到buffer,触发过滤链消息接收事件MessageReceive,接收完消息后,如果会话输入流关闭则触发过滤链fireInputClosed事件,如果在这过程有异常发生,则触发过滤链异常事件ExceptionCaught,如果会话可写,则添加会话到刷新会话队列;遍历刷新会话队列,根据会话写请求消息类型为IoBuffer还是FileRegion,发送会话数据,发送会话数据后,如果会话还有些请求,则添加会话到队列,如果在这个过程中有异常,则添加会话到会话移除队列;遍历会话移除队列,
如果会话为关闭,则尝试关闭会话,并清除会话写请求队列,如果会话数据已发送完,
则触发会话过滤链消息发送事件fireMessageSent;更新处理器会话计数器nSessions;
遍历处理器所有会话,触发会话过滤器会话空闲时间fireSessionIdle;如果在这个过程中,处理器会话计数器nSessions为0,则清除处理器引用;如果Io处理器正在关闭,则添加所有会话到移除会话队列,释放Io处理器先关的资源。

再来看启动处理器方法#startupProcessor的第二点
2.
//执行处理器                                                         
executor.execute(new NamePreservingRunnable(processor, threadName));

/**
 * A {@link Runnable} wrapper that preserves the name of the thread after the runnable is
 * complete (for {@link Runnable}s that change the name of the Thread they use.)
 * 将Runnable包装成一个新的线程,只是线程名不同,线程运行完,恢复原始线程名。
 * @author The Apache MINA Project (dev@mina.apache.org)
 * @version $Rev: 446581 $, $Date: 2006-09-15 11:36:12Z $,
 */
public class NamePreservingRunnable implements Runnable {
    private final Logger logger = LoggerFactory.getLogger(NamePreservingRunnable.class);

    private final String newName;//新线程名
    private final Runnable runnable;//实际线程

    public NamePreservingRunnable(Runnable runnable, String newName) {
        this.runnable = runnable;
        this.newName = newName;
    }

    public void run() {
        Thread currentThread = Thread.currentThread();
        String oldName = currentThread.getName();
        if (newName != null) {
            setName(currentThread, newName);
        }
        try {
            runnable.run();
        } finally {
            setName(currentThread, oldName);
        }
    }
    
    /**
     * Wraps {@link Thread#setName(String)} to catch a possible {@link Exception}s such as
     * {@link SecurityException} in sandbox environments, such as applets
     设置线程名
     */
    private void setName(Thread thread, String name) {
        try {
            thread.setName(name);
        } catch (Exception e) {
            // Probably SecurityException.
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to set the thread name.", e);
            }
        }
    }
}

回到添加会话方法:
 /**
  * {@inheritDoc}
  */
//会话创建时,添加回到到处理器,
 @Override
 public final void add(S session) {
     if (disposed || disposing) {//如果处理器已关闭,则抛出非法状态异常
         throw new IllegalStateException("Already disposed.");
     }
     // Adds the session to the newSession queue and starts the worker
	//添加会话到Io处理器的创建会话队列中
     newSessions.add(session);
      //启动一个Io处理器线程
     startupProcessor();
 }


小节,从上面来年,添加会话首先添加会话到Io处理器的创建会话队列中,启动处理器线程Processor。处理器的实际工作,尝试10次nbTries选择操作,在每次选择操作过程中,
首先进行超时选择操作,然后检查Io处理器是否断开连接,尝试次数nbTries是否为零如果为0,则注册新的选择器;然后遍历创建会话队列,从队列拉取会话,如果会话为不null,则初始化会话,构建会话过滤链(从IoService继承)触发会话过滤链的会话创建和会话打开事件,并记录新创建的会话数量nSessions;更会会话状态,此过程为从会话次序控制队列
获取会话,检查会话状态,如果状态为OPENED更新会话的读写状态,如果为OPENING放回次序控制会话队列;如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件(读写事件),遍历选择后读写等操作就绪的会话,如果会话可读,则读取会话缓存区数据到buffer,触发过滤链消息接收事件MessageReceive,接收完消息后,如果会话输入流
关闭则触发过滤链fireInputClosed事件,如果在这过程有异常发生,则触发过滤链异常事件ExceptionCaught,如果会话可写,则添加会话到刷新会话队列;遍历刷新会话队列,根据会话写请求消息类型为IoBuffer还是FileRegion,发送会话数据,发送会话数据后,如果会话还有些请求,则添加会话到队列,如果在这个过程中有异常,则添加会话到会话移除队列;遍历会话移除队列,如果会话为关闭,则尝试关闭会话,并清除会话写请求队列,如果会话数据已发送完,则触发会话过滤链消息发送事件fireMessageSent;更新处理器会话计数器nSessions;遍历处理器所有会话,触发会话过滤器会话空闲时间fireSessionIdle;
如果在这个过程中,处理器会话计数器nSessions为0,则清除处理器引用;如果Io处理器正在关闭,则添加所有会话到移除会话队列,释放Io处理器先关的资源。
再来看其他方法:
 /**
  * {@inheritDoc}
  将写请求添加到会话写请求队列
  */
 @Override
 public void write(S session, WriteRequest writeRequest) {
     WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();

     writeRequestQueue.offer(session, writeRequest);

     if (!session.isWriteSuspended()) {
         //刷新会话
         this.flush(session);
     }
 }
   /**
     * {@inheritDoc}
     //添加会话到刷新会话队列
     */
    @Override
    public final void flush(S session) {
        // add the session to the queue if it's not already
        // in the queue, then wake up the select()
	//设置会话正在调度flush
        if (session.setScheduledForFlush(true)) {
            flushingSessions.add(session);
            wakeup();
        }
    }
    /**
     * {@inheritDoc}
     移除会话,添加会话到移除会话队列,启动处理器线程
     */
    @Override
    public final void remove(S session) {
        scheduleRemove(session);
        startupProcessor();
    }

    /**
     * {@inheritDoc}
     //释放Io处理器资源
     */
    @Override
    public final void dispose() {
        if (disposed || disposing) {
            return;
        }
        synchronized (disposalLock) {
            disposing = true;
            startupProcessor();
        }
        disposalFuture.awaitUninterruptibly();
        disposed = true;
    }
   /**
     * {@inheritDoc}
     */
    @Override
    public final boolean isDisposing() {
        return disposing;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public final boolean isDisposed() {
        return disposed;
    }

总结:

     抽象Io处理器AbstractPollingIoProcessor,主要几个关键内部变量为选择操作超时时间SELECT_TIMEOUT,用于腾出时间,处理空闲的会话; executor处理器内部执行器,用于运行内部处理器Processor;存储Io处理器等线程最大线程id的threadIds(Map);创建会话队列newSessions用于存储新创建的会话;移除会话队列removingSessions用于存放从处理器移除的会话;刷新会话队列flushingSessions,用于存放要发送写请求的会话;次序控制会话队列trafficControllingSessions用于存放会话待读写的会话;Io处理器线程引用processorRef。
     添加会话首先添加会话到Io处理器的创建会话队列中,启动处理器线程Processor。处理器的实际工作,尝试10次nbTries选择操作,在每次选择操作过程中,首先进行超时选择操作,然后检查Io处理器是否断开连接,尝试次数nbTries是否为零如果为0,则注册新的选择器;然后遍历创建会话队列,从队列拉取会话,如果会话为不null,则初始化会话,构建会话过滤链(从IoService继承)触发会话过滤链的会话创建和会话打开事件,并记录新创建的会话数量nSessions;更会会话状态,此过程为从会话次序控制队列获取会话,检查会话状态,如果状态为OPENED更新会话的读写状态,如果为OPENING放回次序控制会话队列;如果选择操作返回的SELECTKey的值大于0,即有相关的兴趣操作事件(读写事件),遍历选择后读写等操作就绪的会话,如果会话可读,则读取会话缓存区数据到buffer,触发过滤链消息接收事件MessageReceive,接收完消息后,如果会话输入流关闭则触发过滤链fireInputClosed事件,如果在这过程有异常发生,则触发过滤链异常事件ExceptionCaught,如果会话可写,则添加会话到刷新会话队列;遍历刷新会话队列,根据会话写请求消息类型为IoBuffer还是FileRegion,发送会话数据,发送会话数据后,如果会话还有些请求,则添加会话到队列,如果在这个过程中有异常,则添加会话到会话移除队列;遍历会话移除队列,如果会话为关闭,则尝试关闭会话,并清除会话写请求队列,如果会话数据已发送完,则触发会话过滤链消息发送事件fireMessageSent;更新处理器会话计数器nSessions;遍历处理器所有会话,触发会话过滤器会话空闲时间fireSessionIdle;如果在这个过程中,处理器会话计数器nSessions为0,则清除处理器引用;如果Io处理器正在关闭,则添加所有会话到移除会话队列,释放Io处理器先关的资源。
    抽象Io处理器AbstractPollingIoProcessor主要是处理IoProcessor关联会话message*事件,而所有的工作,都是通过处理器线程Processor完成。每当有会话添加到IoProcessor,则启动一个处理器线程Processor,处理会话的读写操作及相关事件。就连IoProcessor资源的释放,也是由处理器线程Processor处理。关闭IoProcessor时,现将处理器关联会话,添加移除会话队列,实际工作由IoProcessor的子类的doDispose方法完成。


附:
//SessionState会话状态
public final class SessionState extends Enum
{
    public static final SessionState OPENING;
    public static final SessionState OPENED;
    public static final SessionState CLOSING;
    private static final SessionState $VALUES[];
    private SessionState(String s, int i)
    {
        super(s, i);
    }
    static 
    {
        OPENING = new SessionState("OPENING", 0);
        OPENED = new SessionState("OPENED", 1);
        CLOSING = new SessionState("CLOSING", 2);
        $VALUES = (new SessionState[] {
            OPENING, OPENED, CLOSING
        });
    }
    public static SessionState[] values()
    {
        return (SessionState[])$VALUES.clone();
    }

    public static SessionState valueOf(String name)
    {
        return (SessionState)Enum.valueOf(org/apache/mina/core/session/SessionState, name);
    }
}

//原子引用AtomicReference
/**
 * An object reference that may be updated atomically. See the {@link
 * java.util.concurrent.atomic} package specification for description
 * of the properties of atomic variables.
 * @since 1.5
 * @author Doug Lea
 * @param <V> The type of object referred to by this reference
 */
public class AtomicReference<V>  implements java.io.Serializable {
    private static final long serialVersionUID = -1848883965231344442L;
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
    static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicReference.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
    }
    private volatile V value;

    /**
     * Creates a new AtomicReference with the given initial value.
     *
     * @param initialValue the initial value
     */
    public AtomicReference(V initialValue) {
        value = initialValue;
    }
    /**
     * Creates a new AtomicReference with null initial value.
     */
    public AtomicReference() {
    }
    /**
     * Gets the current value.
     *
     * @return the current value
     */
    public final V get() {
        return value;
    }
    /**
     * Sets to the given value.
     *
     * @param newValue the new value
     */
    public final void set(V newValue) {
        value = newValue;
    }
    /**
     * Eventually sets to the given value.
     *
     * @param newValue the new value
     * @since 1.6
     */
    public final void lazySet(V newValue) {
        unsafe.putOrderedObject(this, valueOffset, newValue);
    }
    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     * @param expect the expected value
     * @param update the new value
     * @return true if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(V expect, V update) {
        return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    }
    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * <p>May [url=package-summary.html#Spurious]fail spuriously[/url]
     * and does not provide ordering guarantees, so is only rarely an
     * appropriate alternative to {@code compareAndSet}.
     *
     * @param expect the expected value
     * @param update the new value
     * @return true if successful.
     */
    public final boolean weakCompareAndSet(V expect, V update) {
        return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    }
    /**
     * Atomically sets to the given value and returns the old value.
     *
     * @param newValue the new value
     * @return the previous value
     */
    public final V getAndSet(V newValue) {
        while (true) {
            V x = get();
            if (compareAndSet(x, newValue))
                return x;
        }
    }

    /**
     * Returns the String representation of the current value.
     * @return the String representation of the current value.
     */
    public String toString() {
        return String.valueOf(get());
    }

}
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics