`
Donald_Draper
  • 浏览: 945730 次
社区版块
存档分类
最新评论

Mina 报文监听器NioDatagramAcceptor二(发送会话消息等)

    博客分类:
  • Mina
阅读更多
Mina 报文监听器NioDatagramAcceptor一(初始化,Io处理器):http://donald-draper.iteye.com/blog/2379152
引言:
    前面一篇文章我们看了报文监听器NioDatagramAcceptor的内部变量,构造和IO处理器相关的功能,先来回顾一下:
    报文监听器NioDatagramAcceptor,内部有一个注册队列registerQueue,用于存放地址绑定的请求,一个取消队列,用于存放地址解绑请求,一个Map-boundHandles,用于存放socket地址与报文通道映射映射关系,会话管理器sessionRecycler,监控连接Service的会话,如果会话过期,关闭过期的会话,一个通道选择器selector处理报文通道的读写操作事件,一个监听器线程acceptor,用于处理地址绑定和解绑,报文通道读写事件,发送会话消息及销毁监听器工作。报文监听器构造主要是初始化会话配置,IO事件执行器和打开选择器。报文监听器写操作,首先获取会话写请求队列,计算会话最大发送字节数,获取会话写请求buffer;如果写请求为空,则从请求队列poll一个写请求,然后获取写请求buffer及写请求目的socket地址,委托会话关联的报文通道发送数据;如果buffer数据太多或没有写成功,添加写请求到会话请求队列,关注写事件,否则取消关注写事件,置空会话当前写请求,触发会话发送事件。绑定地址,首先添加地址绑定请求到注册队列registerQueue,启动监听器线程acceptor,唤醒选择操作,然后等待地址绑定完成,最后返回报文通道绑定的socket地址集。
现在我们来看NioDatagramAcceptor的IoAcceptor和Io服务相关功能的实现:先贴出报文监听器NioDatagramAcceptor的内部变量声明,以便理解后面的内容,
/**
 * {@link IoAcceptor} for datagram transport (UDP/IP).
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 * @org.apache.xbean.XBean
 */
public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> {

    /**
     * A session recycler that is used to retrieve an existing session, unless it's too old.
     默认过期会话回收器
     **/
    private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
    /**
     * A timeout used for the select, as we need to get out to deal with idle
     * sessions 选择超时时间
     */
    private static final long SELECT_TIMEOUT = 1000L;
    /** A lock used to protect the selector to be waked up before it's created */
    private final Semaphore lock = new Semaphore(1);
    /** A queue used to store the list of pending Binds 地址绑定请求*/
    private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>();
    //地址解绑请求队列
    private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>();
    //刷新会话队列,IO处理器刷新操作会用到,暂存刷新操作的会话
    private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<>();
    // socket地址与报文通道映射Map,绑定操作使socket地址与报文通道关联起来
    private final Map<SocketAddress, DatagramChannel> boundHandles = Collections
            .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
    //会话管理器,监控连接Service的会话,如果会话过期,关闭过期的会话
    private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
    private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
    private volatile boolean selectable;
    /** The thread responsible of accepting incoming requests */
    private Acceptor acceptor;//监听器线程
    private long lastIdleCheckTime;//上次空闲检查时间
    /** The Selector used by this acceptor 选择器*/
    private volatile Selector selector;
}

回到上一篇文章启动监听器线程片段startupAcceptor
/**
 * Starts the inner Acceptor thread.
 */
private void startupAcceptor() throws InterruptedException {
    if (!selectable) {
        //如果选择器初始化失败,则清空注册队列,取消队列及刷新会话队列
        registerQueue.clear();
        cancelQueue.clear();
        flushingSessions.clear();
    }
    lock.acquire();
    if (acceptor == null) {
        //创建Acceptor线程实例,并执行
        acceptor = new Acceptor();
        executeWorker(acceptor);
    } else {
        lock.release();
    }
}

下面来看一下Acceptor的定义:
/**
  * This private class is used to accept incoming connection from
  * clients. It's an infinite loop, which can be stopped when all
  * the registered handles have been removed (unbound).
  接收客户端的连接。主操作是一个无限循环,当所有绑定的地址的报文通道解绑时,
  循环退出
  */
 private class Acceptor implements Runnable {
     @Override
     public void run() {
         int nHandles = 0;
         lastIdleCheckTime = System.currentTimeMillis();
         // Release the lock
         lock.release();
         while (selectable) {
             try {
	         //超时选择
                 int selected = select(SELECT_TIMEOUT);
		 //处理地址绑定请求
                 nHandles += registerHandles();
                 if (nHandles == 0) {
                     try {
		         //如果没有报文通道处理,则清空注册队列和取消队列,置空监听器线程
                         lock.acquire();
                         if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
                             acceptor = null;
                             break;
                         }
                     } finally {
                         lock.release();
                     }
                 }
                 if (selected > 0) {
		     //处理读写操作时间就绪的会话
                     processReadySessions(selectedHandles());
                 }
                 long currentTime = System.currentTimeMillis();
		 //发送刷新队列中的写请求
                 flushSessions(currentTime);
		 //处理报文通道地址解绑请求
                 nHandles -= unregisterHandles();
		 //通知会话空闲
                 notifyIdleSessions(currentTime);
             } catch (ClosedSelectorException cse) {
                 // If the selector has been closed, we can exit the loop
                 ExceptionMonitor.getInstance().exceptionCaught(cse);
                 break;
             } catch (Exception e) {
                 ExceptionMonitor.getInstance().exceptionCaught(e);
                 try {
                     Thread.sleep(1000);
                 } catch (InterruptedException e1) {
                 }
             }
         }
          //如何Io处理器正在关闭,则销毁报文监听器
         if (selectable && isDisposing()) {
             selectable = false;
             try {
                 destroy();
             } catch (Exception e) {
                 ExceptionMonitor.getInstance().exceptionCaught(e);
             } finally {
                 disposalFuture.setValue(true);
             }
         }
     }
}

监听器线程有一下几点要关注:
1.
//处理地址绑定请求
nHandles += registerHandles();

2.
if (selected > 0) {                          
    //处理读写操作时间就绪的会话             
    processReadySessions(selectedHandles()); 
}  
 
3.
//发送刷新队列中的写请求
flushSessions(currentTime);

4.
//处理报文通道地址解绑请求
nHandles -= unregisterHandles();

5.
//通知会话空闲
notifyIdleSessions(currentTime);

6.
 //如何Io处理器正在关闭,则销毁报文监听器                  
if (selectable && isDisposing()) {                         
    selectable = false;                                    
    try {                                                  
        destroy();                                         
    } catch (Exception e) {                                
        ExceptionMonitor.getInstance().exceptionCaught(e); 
    } finally {                                            
        disposalFuture.setValue(true);                     
    }                                                      
}  
                                                       
我们分别来以上几点:
1.
//处理地址绑定请求
nHandles += registerHandles();


private int registerHandles() {
    for (;;) {
        //从注册队列,poll地址绑定请求
        AcceptorOperationFuture req = registerQueue.poll();
        if (req == null) {
            break;
        }
        Map<SocketAddress, DatagramChannel> newHandles = new HashMap<>();
        List<SocketAddress> localAddresses = req.getLocalAddresses();
        try {
	   //遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道
            for (SocketAddress socketAddress : localAddresses) 
                DatagramChannel handle = open(socketAddress);
		//添加socket地址与报文通道映射到集合newHandles
                newHandles.put(localAddress(handle), handle);
            }
            添加socket地址与报文通道映射到boundHandles
            boundHandles.putAll(newHandles);
            //通知service监听,服务已开启,及触发fireServiceActivated事件
            getListeners().fireServiceActivated();
	    //地址绑定结束
            req.setDone();
            return newHandles.size();
        } catch (Exception e) {
            req.setException(e);
        } finally {
            // Roll back if failed to bind all addresses.
	    //如果异常,则关闭报文通道
            if (req.getException() != null) {
                for (DatagramChannel handle : newHandles.values()) {
                    try {
                        close(handle);
                    } catch (Exception e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    }
                }
                wakeup();
            }
        }
    }
    return 0;
}

来看打开通道方法:
protected DatagramChannel open(SocketAddress localAddress) throws Exception {
    //打开一个报文通道
    final DatagramChannel ch = DatagramChannel.open();
    boolean success = false;
    try {
       //配置通道会话及阻塞模式
        new NioDatagramSessionConfig(ch).setAll(getSessionConfig());
        ch.configureBlocking(false);

        try {
	    //绑定地址
            ch.socket().bind(localAddress);
        } catch (IOException ioe) {
            // Add some info regarding the address we try to bind to the
            // message
            String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
                    + ioe.getMessage();
            Exception e = new IOException(newMessage);
            e.initCause(ioe.getCause());
            // And close the channel
            ch.close();
            throw e;
        }
         //注册报文通道读操作事件OP_READ到选择器selector
        ch.register(selector, SelectionKey.OP_READ);
        success = true;
    } finally {
        if (!success) {
            close(ch);
        }
    }

    return ch;
}

从上面来看,处理地址绑定请求,首先从注册队列poll地址绑定请求,遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道,配置通道会话及阻塞模式,绑定socket地址,注册报文通道读操作事件OP_READ到选择器selector,添加socket地址与报文通道映射到boundHandles,
通知service监听,服务已开启,触发fireServiceActivated事件;

再来看第二点:
2.
if (selected > 0) {                          
    //处理读写操作时间就绪的会话             
    processReadySessions(selectedHandles()); 
}  

 private void processReadySessions(Set<SelectionKey> handles) {
        Iterator<SelectionKey> iterator = handles.iterator();
	//遍历读写操作事件就绪的报文通道
        while (iterator.hasNext()) {
            //获取选择key,及报文通道
            SelectionKey key = iterator.next();
            DatagramChannel handle = (DatagramChannel) key.channel();
            iterator.remove();
            try {
	        //执行读操作
                if (key.isValid() && key.isReadable()) {
                    readHandle(handle);
                }
		//执行写操作
                if (key.isValid() && key.isWritable()) {
                    for (IoSession session : getManagedSessions().values()) {
                        scheduleFlush((NioSession) session);
                    }
                }
            } catch (Exception e) {
                ExceptionMonitor.getInstance().exceptionCaught(e);
            }
        }
}

这一点有两点要关注
2.a
//执行读操作                              
if (key.isValid() && key.isReadable()) {  
    readHandle(handle);                   
}         
                               
private void readHandle(DatagramChannel handle) throws Exception {
        IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize());
	//接收数据
        SocketAddress remoteAddress = receive(handle, readBuf);
        if (remoteAddress != null) {
	    //创建会话
            IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle));
            readBuf.flip();
	    //触发会话过滤链的消息接收事件fireMessageReceived
            session.getFilterChain().fireMessageReceived(readBuf);
        }
}

来看报文读处理的数据接收和会话创建
2.a.1
//接收数据
protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception {
        return handle.receive(buffer.buf());
}

2.a.2
//创建会话
private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    //获取远端socket地址关联的报文通道
    DatagramChannel handle = boundHandles.get(localAddress);
    if (handle == null) {
        throw new IllegalArgumentException("Unknown local address: " + localAddress);
    }
    IoSession session;
    synchronized (sessionRecycler) {
        //从会话管理器,获取远端socket地址会话,以便重用
        session = sessionRecycler.recycle(remoteAddress);
        if (session != null) {
            return session;
        }
        // If a new session needs to be created.
	//创建会话
        NioSession newSession = newSession(this, handle, remoteAddress);
	//将会话添加会话管理器,监控会话
        getSessionRecycler().put(newSession);
        session = newSession;
    }
    //初始化会话
    initSession(session, null, null);
    try {
        //构建会话过滤链
        this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
	//通知Service监听器发生会话创建事件fireSessionCreated
        getListeners().fireSessionCreated(session);
    } catch (Exception e) {
        ExceptionMonitor.getInstance().exceptionCaught(e);
    }
    return session;
}

来看创建会话这一点
//创建会话
NioSession newSession = newSession(this, handle, remoteAddress);

//根据Io处理器,报文通道及远端socket地址创建会话
 protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle,
            SocketAddress remoteAddress) {
        //获取报文通道注册到选择器的选择key
        SelectionKey key = handle.keyFor(selector);
        if ((key == null) || (!key.isValid())) {
            return null;
        }
        //创建报文会话
        NioDatagramSession newSession = new NioDatagramSession(this, handle, processor, remoteAddress);
	//设置会话选择key
        newSession.setSelectionKey(key);
        return newSession;
    }

默认会话管理器sessionRecycler,见附;
2.b
//执行写操作                                                  
if (key.isValid() && key.isWritable()) {    
    //调度Service管理的会话
    for (IoSession session : getManagedSessions().values()) { 
        scheduleFlush((NioSession) session);                  
    }                                                         
}   
                                                         
从上面可以看出,处理报文通道就绪续事件,如果是读事件,接受报文通道数据,如果远端地址不为空,创建会话,首先从boundHandles获取远端socket地址关联的报文通道,从会话管理器sessionRecycler,获取远端socket地址会话,以便重用,如果会话管理器中不存在,则根据Io处理器,报文通道及远端socket地址创建报文会话,设置会话选择key,将会话添加会话管理器,监控会话,初始化会话,构建会话过滤链,通知Service监听器发生会话创建事件fireSessionCreated;如果是写事件,则调度Service管理的会话,添加到刷新队列;

再来看发送刷新队列的会话写请求:
3.
//发送刷新队列中的会话写请求
flushSessions(currentTime);


private void flushSessions(long currentTime) {
    for (;;) {
        //从刷新队列获取会话
        NioSession session = flushingSessions.poll();
        if (session == null) {
            break;
        }
        // Reset the Schedule for flush flag for this session,
        // as we are flushing it now
	//设置会话为未调度
        session.unscheduledForFlush();
        try {
	    //刷新会话
            boolean flushedAll = flush(session, currentTime);
            if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) {
	        //如果刷新成功,但会话写请求队列不为空,且未调度,则重新调度会话
                scheduleFlush(session);
            }
        } catch (Exception e) {
            session.getFilterChain().fireExceptionCaught(e);
        }
    }
}

//发送会话写请求
 private boolean flush(NioSession session, long currentTime) throws Exception {
        //获取会话写请求队列,会话最大读buffersize
        final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
        final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
                + (session.getConfig().getMaxReadBufferSize() >>> 1);
        int writtenBytes = 0;
        try {
            for (;;) {
	        //获取会话当前写请求
                WriteRequest req = session.getCurrentWriteRequest();
                if (req == null) {
                    //从写请求队列poll一个写请求
                    req = writeRequestQueue.poll(session);
                    if (req == null) {
		        //设置会话不在关注写事件
                        setInterestedInWrite(session, false);
                        break;
                    }
                    //设置会话当前写请求
                    session.setCurrentWriteRequest(req);
                }
                //获取写请求消息
                IoBuffer buf = (IoBuffer) req.getMessage();

                if (buf.remaining() == 0) {
                    // Clear and fire event
		    //置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent
                    session.setCurrentWriteRequest(null);
                    buf.reset();
                    session.getFilterChain().fireMessageSent(req);
                    continue;
                }
                //获取写请求远端地址
                SocketAddress destination = req.getDestination();
                //如果写请求远端地址为null,则获取会话远端地址
                if (destination == null) {
                    destination = session.getRemoteAddress();
                }
                //发送会话写请求字节序列
                int localWrittenBytes = send(session, buf, destination);

                if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
                    // Kernel buffer is full or wrote too much
		    //如果数据太多或发送数据失败,设置会话关注写操作事件
                    setInterestedInWrite(session, true);
                    return false;
                } else {
		   //数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent
                    setInterestedInWrite(session, false);

                    // Clear and fire event
                    session.setCurrentWriteRequest(null);
                    writtenBytes += localWrittenBytes;
                    buf.reset();
                    session.getFilterChain().fireMessageSent(req);
                }
            }
        } finally {
	    //更新会话写字节计数器
            session.increaseWrittenBytes(writtenBytes, currentTime);
        }

        return true;
}

//委托会话关联的报文通道发送会话消息字节序列
 protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception {
        return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress);
 }

从上面可以看出处理刷新队列,从刷新队列poll写请求会话,获取会话写请求队列,会话最大读buffer size,获取会话当前写请求,获取写请求消息,写请求远端地址,通过会话关联的报文通道发送会话消息字节序列,数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent,否则设置会话重新关注写操作事件,如果刷新会话写请求成功,但会话写请求队列不为空,且未调度,则重新调度会话
4.
//处理报文通道地址解绑请求
nHandles -= unregisterHandles();

private int unregisterHandles() {
    int nHandles = 0;
    for (;;) {
       //从取消队列,poll地址解绑请求
        AcceptorOperationFuture request = cancelQueue.poll();
        if (request == null) {
            break;
        }
        // close the channels
	//遍历地址解绑请求socket地址集合
        for (SocketAddress socketAddress : request.getLocalAddresses()) {
	    //从socket与报文通道映射集boundHandles移除socket地址
            DatagramChannel handle = boundHandles.remove(socketAddress);
            if (handle == null) {
                continue;
            }
            try {
	        //关闭报文通道
                close(handle);
		//唤醒选择操作
                wakeup(); // wake up again to trigger thread death
            } catch (Exception e) {
                ExceptionMonitor.getInstance().exceptionCaught(e);
            } finally {
                nHandles++;
            }
        }
	//解绑成功
        request.setDone();
    }
    return nHandles;
}

//关闭通道
protected void close(DatagramChannel handle) throws Exception {
    SelectionKey key = handle.keyFor(selector);
    //取消选择key
    if (key != null) {
        key.cancel();
    }
   //关闭连接及通道
    handle.disconnect();
    handle.close();
}


从上可以看出处理解绑地址请求队列,首先从取消队列,poll地址解绑请求,遍历地址解绑请求socket地址集合,从socket与报文通道映射集boundHandles移除socket地址,关闭报文通道;

5.
//通知会话空闲
notifyIdleSessions(currentTime);


 private void notifyIdleSessions(long currentTime) {
     // process idle sessions
     if (currentTime - lastIdleCheckTime >= 1000) {
         lastIdleCheckTime = currentTime;
	 //通知service管理的会话空闲
         AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime);
     }
 }

6.
//如何Io处理器正在关闭,则销毁报文监听器                  
if (selectable && isDisposing()) {                         
    selectable = false;                                    
    try {                                                  
        destroy();                                         
    } catch (Exception e) {                                
        ExceptionMonitor.getInstance().exceptionCaught(e); 
    } finally {                                            
        disposalFuture.setValue(true);                     
    }                                                      
}  

//关闭选择器
 protected void destroy() throws Exception {
        if (selector != null) {
            selector.close();
        }
}

来看剩余的方法操作,很简单,不详解:
 
/**
  * {@inheritDoc}
  创建会话
  */
 @Override
 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
     if (isDisposing()) {
         throw new IllegalStateException("The Acceptor is being disposed.");
     }
     if (remoteAddress == null) {
         throw new IllegalArgumentException("remoteAddress");
     }
     synchronized (bindLock) {
         if (!isActive()) {
             throw new IllegalStateException("Can't create a session from a unbound service.");
         }
         try {
	     //创建报文会话
             return newSessionWithoutLock(remoteAddress, localAddress);
         } catch (RuntimeException | Error e) {
             throw e;
         } catch (Exception e) {
             throw new RuntimeIoException("Failed to create a session.", e);
         }
     }
 }
 /**
  * {@inheritDoc}
  解绑地址
  */
 @Override
 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
     AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
     //添加地址解绑请求到取消队列
     cancelQueue.add(request);
     startupAcceptor();//启动监听器线程
     wakeup();//唤醒选择器
     //等待解绑成功
     request.awaitUninterruptibly();
     if (request.getException() != null) {
         throw request.getException();
     }
 }
 /**
  * {@inheritDoc}
  关闭IO处理器相关的资源
  */
 @Override
 protected void dispose0() throws Exception {
     unbind();//解绑地址
     startupAcceptor();//启动监听器线程
     wakeup();
 }
 //选择操作
 protected int select() throws Exception {
     return selector.select();
 }
 protected int select(long timeout) throws Exception {
     return selector.select(timeout);
 }
 //上一次选择后,存在就绪事件的选择key
 protected Set<SelectionKey> selectedHandles() {
     return selector.selectedKeys();
 }
 @Override
 public InetSocketAddress getDefaultLocalAddress() {
     return (InetSocketAddress) super.getDefaultLocalAddress();
 }
 @Override
 public InetSocketAddress getLocalAddress() {
     return (InetSocketAddress) super.getLocalAddress();
 }
 /**
  * {@inheritDoc}
  */
 @Override
 public DatagramSessionConfig getSessionConfig() {
     return (DatagramSessionConfig) sessionConfig;
 }

 @Override
 public final IoSessionRecycler getSessionRecycler() {
     return sessionRecycler;
 }

 @Override
 public TransportMetadata getTransportMetadata() {
     return NioDatagramSession.METADATA;
 }
 protected boolean isReadable(DatagramChannel handle) {
     SelectionKey key = handle.keyFor(selector);

     if ((key == null) || (!key.isValid())) {
         return false;
     }

     return key.isReadable();
 }
 protected boolean isWritable(DatagramChannel handle) {
     SelectionKey key = handle.keyFor(selector);

     if ((key == null) || (!key.isValid())) {
         return false;
     }
     return key.isWritable();
 }
 @Override
 public void setDefaultLocalAddress(InetSocketAddress localAddress) {
     setDefaultLocalAddress((SocketAddress) localAddress);
 }
 @Override
 public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
     synchronized (bindLock) {
         if (isActive()) {
             throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound.");
         }

         if (sessionRecycler == null) {
             sessionRecycler = DEFAULT_RECYCLER;
         }

         this.sessionRecycler = sessionRecycler;
     }
 }

在下面这篇文章中,我们讲过报文过滤链,可以集合本文,在回到看看下面这篇文章
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
我们贴出上面这篇文章的报文过滤链的定义:
class DatagramFilterChain extends AbstractIoFilterChain {
    DatagramFilterChain(IoSession parent) {
        super(parent);
    }
    //会话发送写请求,及添加会话写请求队列,待报文监听器调度刷新,即通过会话关联的报文通道
    //发送消息字节序列
    protected void doWrite(IoSession session, WriteRequest writeRequest) {
        DatagramSessionImpl s = (DatagramSessionImpl) session;
	//获取Socket会话的的写请求队列,Queue继承于AbstractList,这个我们在后面再讲
        Queue writeRequestQueue = s.getWriteRequestQueue();

        // SocketIoProcessor.doFlush() will reset it after write is finished
        // because the buffer will be passed with messageSent event. 
        //这里之所以要mark buffer的位置,主要是buffer要传给messageSent事件,
	//待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置
        ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
        buffer.mark();
        int remaining = buffer.remaining();
        if (remaining == 0) {
	    //BaseIoSession
	    // private final AtomicInteger scheduledWriteRequests = new AtomicInteger();
            //更新调度请求计数器+1
            s.increaseScheduledWriteRequests();            
        } else {
	     //BaseIoSession
	    //private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
	    //更新调度写字节计数器+buffer.remaining()
            s.increaseScheduledWriteBytes(buffer.remaining());
            s.increaseScheduledWriteBytes(buffer.remaining());
        }

        synchronized (writeRequestQueue) {
	    //将写请求添加到session写请求队列中
            writeRequestQueue.push(writeRequest);
        }
        
        if (session.getTrafficMask().isWritable()) {
	     //DatagramSessionImpl
	     //private final DatagramService managerDelegate;
	    //如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作,
	    //这个在以后在具体详说
            s.getManagerDelegate().flushSession(s);
        }
    }

    protected void doClose(IoSession session) {
        DatagramSessionImpl s = (DatagramSessionImpl) session;
        DatagramService manager = s.getManagerDelegate();
	////委托给session关联的managerDelegate(DatagramService)关闭会话
        if (manager instanceof DatagramConnectorDelegate) {
	    //如果是DatagramConnectorDelegate者直接关闭会话,则在后面具体再看
            ((DatagramConnectorDelegate) manager).closeSession(s);
        } else {
	    //通知DatagramAcceptorDelegate的监听器会话已关闭
            ((DatagramAcceptorDelegate) manager).getListeners()
                    .fireSessionDestroyed(session);
	    //设置会话CloseFuture为已关闭状态
            session.getCloseFuture().setClosed();
        }
    }
}

报文过滤链发送会话写请求,即添加会话写请求队列,待报文监听器NioDatagramAcceptor(监听器线程Acceptor)调度刷新(通过会话关联的报文通道发送消息字节序列)。


总结:                                                              
监听器线程Acceptor,首先执行超时选择操作;处理地址绑定请求,首先从注册队列poll地址绑定请求,遍历绑定请求地址集,根据绑定的socket地址打开一个报文通道,配置通道会话及阻塞模式,绑定socket地址,注册报文通道读操作事件OP_READ到选择器selector,添加socket地址与报文通道映射到boundHandles,通知service监听,服务已开启,触发fireServiceActivated事件;  如果没有报文通道处理,则清空注册队列和取消队列,置空监听器线程; 如果选择操作后,有报文通道的读写事件就绪,则遍历读写操作事件就绪的报文通道,如果是读事件,接受报文通道数据,如果远端地址不为空,创建会话,首先从boundHandles获取远端socket地址关联的报文通道,从会话管理器sessionRecycler,获取远端socket地址会话,以便重用,如果会话管理器中不存在,则根据Io处理器,报文通道及远端socket地址创建报文会话,设置会话选择key,将会话添加会话管理器,监控会话,初始化会话,构建会话过滤链,通知Service监听器发生会话创建事件fireSessionCreated;如果是写事件,则调度Service管理的会话,添加到刷新队列; 处理刷新队列,从刷新队列poll写请求会话,获取会话写请求队列,会话最大读buffer size,获取会话当前写请求,获取写请求消息,写请求远端地址,通过会话关联的报文通道发送会话消息字节序列,数据发送成功,置空会话当前写请求,触发会话过滤链消息发送事件fireMessageSent,否则设置会话重新关注写操作事件,如果刷新会话写请求成功,但会话写请求队列不为空,且未调度,则重新调度会话;处理解绑地址请求队列,首先从取消队列,poll地址解绑请求,遍历地址解绑请求socket地址集合,从socket与报文通道映射集boundHandles移除socket地址,关闭报文通道;通知service管理的会话空闲;如何Io处理器正在关闭,则销毁报文监听器。



附:
来看一下默认会话管理器ExpiringSessionRecycler:
/**
 * An {@link IoSessionRecycler} with sessions that time out on inactivity.
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 * @org.apache.xbean.XBean
 */
public class ExpiringSessionRecycler implements IoSessionRecycler {
    /** A map used to store the session 存储会话*/
    private ExpiringMap<SocketAddress, IoSession> sessionMap;
    /** A map used to keep a track of the expiration ,监控会话是否过期线程*/ 
    private ExpiringMap<SocketAddress, IoSession>.Expirer mapExpirer;
    /**
     * Create a new ExpiringSessionRecycler instance
     */
    public ExpiringSessionRecycler() {
        this(ExpiringMap.DEFAULT_TIME_TO_LIVE);
    }
    /**
     * Create a new ExpiringSessionRecycler instance
     * 
     * @param timeToLive The delay after which the session is going to be recycled
     */
    public ExpiringSessionRecycler(int timeToLive) {
        this(timeToLive, ExpiringMap.DEFAULT_EXPIRATION_INTERVAL);
    }
    /**
     * Create a new ExpiringSessionRecycler instance
     * 
     * @param timeToLive The delay after which the session is going to be recycled
     * @param expirationInterval The delay after which the expiration occurs
     */
    public ExpiringSessionRecycler(int timeToLive, int expirationInterval) {
        sessionMap = new ExpiringMap<>(timeToLive, expirationInterval);
        mapExpirer = sessionMap.getExpirer();
	//添加会话过期监听器
        sessionMap.addExpirationListener(new DefaultExpirationListener());
    }

    /**
     * {@inheritDoc}
     添加会话
     */
    @Override
    public void put(IoSession session) {
       //如果检查线程没启动,启动检查线程,监控会话是否过期
        mapExpirer.startExpiringIfNotStarted();
        SocketAddress key = session.getRemoteAddress();
        if (!sessionMap.containsKey(key)) {
            sessionMap.put(key, session);
        }
    }
    /**
     * {@inheritDoc}
     获取远端socket地址对应的会话
     */
    @Override
    public IoSession recycle(SocketAddress remoteAddress) {
        return sessionMap.get(remoteAddress);
    }
    /**
     * {@inheritDoc}
     移除会话
     */
    @Override
    public void remove(IoSession session) {
        sessionMap.remove(session.getRemoteAddress());
    }
    /**
     * Stop the thread from monitoring the map
     停止过期检查线程
     */
    public void stopExpiring() {
        mapExpirer.stopExpiring();
    }
    //配置获取对象生存时间
    /**
     * Update the value for the time-to-live
     *
     * @param timeToLive The time-to-live (seconds)
     */
    public void setTimeToLive(int timeToLive) {
        sessionMap.setTimeToLive(timeToLive);
    }
    /**
     * @return The session time-to-live in second
     */
    public int getTimeToLive() {
        return sessionMap.getTimeToLive();
    }
    //配置获取过期检查间隔
    /**
     * Set the interval in which a session will live in the map before it is removed.
     * 
     * @param expirationInterval The session expiration time in seconds
     */
    public void setExpirationInterval(int expirationInterval) {
        sessionMap.setExpirationInterval(expirationInterval);
    }
     /**
     * @return The session expiration time in second
     */
    public int getExpirationInterval() {
        return sessionMap.getExpirationInterval();
    }
    
    //默认过期监听器,即关闭会话
    private class DefaultExpirationListener implements ExpirationListener<IoSession> {
        @Override
        public void expired(IoSession expiredSession) {
            expiredSession.closeNow();
        }
    }
}

//过期Map-ExpiringMap
**
 * A map with expiration.  This class contains a worker thread that will 
 * periodically check this class in order to determine if any objects 
 * should be removed based on the provided time-to-live value.
 * 过期map包含一个线程,将间歇地检查监控集合delegate中的过期对象ExpiringObject的生存时间是否
 大于timeToLive,大于则从监控集合delegate中移除过期元素对象ExpiringObject。
 * @param <K> The key type
 * @param <V> The value type
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public class ExpiringMap<K, V> implements Map<K, V> {
    /** The default value, 60 seconds */
    public static final int DEFAULT_TIME_TO_LIVE = 60;//对象生存时间,默认60s
    /** The default value, 1 second */
    public static final int DEFAULT_EXPIRATION_INTERVAL = 1;//默认检查间隔1s
    private static volatile int expirerCount = 1;
    private final ConcurrentHashMap<K, ExpiringObject> delegate;//检查线程expirer,监控的Map
    private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners;
    private final Expirer expirer;//过期Map元素检查线程
    /**
     * Creates a new instance of ExpiringMap using the default values 
     * DEFAULT_TIME_TO_LIVE and DEFAULT_EXPIRATION_INTERVAL
     *
     */
    public ExpiringMap() {
        this(DEFAULT_TIME_TO_LIVE, DEFAULT_EXPIRATION_INTERVAL);
    }
    /**
     * Creates a new instance of ExpiringMap using the supplied 
     * time-to-live value and the default value for DEFAULT_EXPIRATION_INTERVAL
     *
     * @param timeToLive The time-to-live value (seconds)
     */
    public ExpiringMap(int timeToLive) {
        this(timeToLive, DEFAULT_EXPIRATION_INTERVAL);
    }

    /**
     * Creates a new instance of ExpiringMap using the supplied values and 
     * a {@link ConcurrentHashMap} for the internal data structure.
     *
     * @param timeToLive The time-to-live value (seconds)
     * @param expirationInterval The time between checks to see if a value should be removed (seconds)
     */
    public ExpiringMap(int timeToLive, int expirationInterval) {
        this(new ConcurrentHashMap<K, ExpiringObject>(), new CopyOnWriteArrayList<ExpirationListener<V>>(), timeToLive,
                expirationInterval);
    }

    private ExpiringMap(ConcurrentHashMap<K, ExpiringObject> delegate,
            CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners, int timeToLive, int expirationInterval) {
        this.delegate = delegate;//需要过期检查的对象集合(报文会话)
        this.expirationListeners = expirationListeners;//过期监听器
        this.expirer = new Expirer();//过期检查线程
        expirer.setTimeToLive(timeToLive);//设置对象存活时间
        expirer.setExpirationInterval(expirationInterval);//设置检查线程检查过期元素间隔
   }
   //此处省略一些方法,主要是put,get,contain,remove等操作
   ...
   //过期map元素
    private class ExpiringObject {
        private K key;
        private V value;
        private long lastAccessTime;//上次访问时间
	//可重入读写锁,保护lastAccessTime的读写操作
        private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();
        ExpiringObject(K key, V value, long lastAccessTime) {
            if (value == null) {
                throw new IllegalArgumentException("An expiring object cannot be null.");
            }
            this.key = key;
            this.value = value;
            this.lastAccessTime = lastAccessTime;
        }
        public long getLastAccessTime() {
            lastAccessTimeLock.readLock().lock();

            try {
                return lastAccessTime;
            } finally {
                lastAccessTimeLock.readLock().unlock();
            }
        }
        public void setLastAccessTime(long lastAccessTime) {
            lastAccessTimeLock.writeLock().lock();

            try {
                this.lastAccessTime = lastAccessTime;
            } finally {
                lastAccessTimeLock.writeLock().unlock();
            }
        }
        public K getKey() {
            return key;
        }
        public V getValue() {
            return value;
        }
        @Override
        public boolean equals(Object obj) {
            return value.equals(obj);
        }
        @Override
        public int hashCode() {
            return value.hashCode();
        }
    }

    /**
     * A Thread that monitors an {@link ExpiringMap} and will remove
     * elements that have passed the threshold.
     *
     */
    public class Expirer implements Runnable {
       //状态锁
        private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
        private long timeToLiveMillis;//保活时间
        private long expirationIntervalMillis;//过期检查间隔时间
        private boolean running = false;
        private final Thread expirerThread;
        /**
         * Creates a new instance of Expirer.  
         *
         */
        public Expirer() {
            expirerThread = new Thread(this, "ExpiringMapExpirer-" + expirerCount++);
            expirerThread.setDaemon(true);
        }

        /**
         * {@inheritDoc}
         */
        @Override
        public void run() {
            while (running) {
                processExpires();
                try {
                    Thread.sleep(expirationIntervalMillis);
                } catch (InterruptedException e) {
                    // Do nothing
                }
            }
        }

        private void processExpires() {
            long timeNow = System.currentTimeMillis();
            //遍历代理Map中的过期元素ExpiringObject
            for (ExpiringObject o : delegate.values()) {
                if (timeToLiveMillis <= 0) {
                    continue;
                }
                long timeIdle = timeNow - o.getLastAccessTime();
                if (timeIdle >= timeToLiveMillis) {
		   //如果过期,则从代理Map中移除对象
                    delegate.remove(o.getKey());
                    for (ExpirationListener<V> listener : expirationListeners) {
		        //通知过期监听器,过期对象已移除
                        listener.expired(o.getValue());
                    }
                }
            }
        }

        /**
         * Kick off this thread which will look for old objects and remove them.
         *启动过期检查线程
         */
        public void startExpiring() {
            stateLock.writeLock().lock();
            try {
                if (!running) {
                    running = true;
                    expirerThread.start();
                }
            } finally {
                stateLock.writeLock().unlock();
            }
        }

        /**
         * If this thread has not started, then start it.  
         * Otherwise just return;
	 如果过期检查线程没有启动,则启动
         */
        public void startExpiringIfNotStarted() {
            stateLock.readLock().lock();
            
            try {
                if (running) {
                    return;
                }
            } finally {
                stateLock.readLock().unlock();
            }
            stateLock.writeLock().lock();
            try {
                if (!running) {
                    running = true;
                    expirerThread.start();
                }
            } finally {
                stateLock.writeLock().unlock();
            }
        }

        /**
         * Stop the thread from monitoring the map.
	 中断过期检查线程,监控过期Map
         */
        public void stopExpiring() {
            stateLock.writeLock().lock();
            try {
                if (running) {
                    running = false;
                    expirerThread.interrupt();
                }
            } finally {
                stateLock.writeLock().unlock();
            }
        }

        /**
         * Checks to see if the thread is running
         *
         * @return
         *  If the thread is running, true.  Otherwise false.
         */
        public boolean isRunning() {
            stateLock.readLock().lock();

            try {
                return running;
            } finally {
                stateLock.readLock().unlock();
            }
        }
        //配置获取对象生存时间
        /**
         * @return the Time-to-live value in seconds.
         */
        public int getTimeToLive() {
            stateLock.readLock().lock();

            try {
                return (int) timeToLiveMillis / 1000;
            } finally {
                stateLock.readLock().unlock();
            }
        }
        /**
         * Update the value for the time-to-live
         *
         * @param timeToLive
         *  The time-to-live (seconds)
         */
        public void setTimeToLive(long timeToLive) {
            stateLock.writeLock().lock();

            try {
                this.timeToLiveMillis = timeToLive * 1000;
            } finally {
                stateLock.writeLock().unlock();
            }
        }
       //配置获取过期检查间隔
        /**
         * Get the interval in which an object will live in the map before
         * it is removed.
         *
         * @return
         *  The time in seconds.
         */
        public int getExpirationInterval() {
            stateLock.readLock().lock();
            try {
                return (int) expirationIntervalMillis / 1000;
            } finally {
                stateLock.readLock().unlock();
            }
        }
        /**
         * Set the interval in which an object will live in the map before
         * it is removed.
         *
         * @param expirationInterval
         *  The time in seconds
         */
        public void setExpirationInterval(long expirationInterval) {
            stateLock.writeLock().lock();

            try {
                this.expirationIntervalMillis = expirationInterval * 1000;
            } finally {
                stateLock.writeLock().unlock();
            }
        }
    }
}

//对象过期监听器ExpirationListener
public interface ExpirationListener
{
    public abstract void expired(Object obj);
}
0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics