`
Donald_Draper
  • 浏览: 949527 次
社区版块
存档分类
最新评论

Mina Socket与报文过滤链

    博客分类:
  • Mina
阅读更多
Mina 过滤器定义:http://donald-draper.iteye.com/blog/2376161
Mina 日志过滤器与引用计数过滤器:http://donald-draper.iteye.com/blog/2376226
Mina 过滤链默认构建器:http://donald-draper.iteye.com/blog/2375985
Mina 过滤链抽象实现:http://donald-draper.iteye.com/blog/2376335
引言:
在前一篇文章中我们看了一下过滤链的抽象实现,先来回顾一下
  AbstractIoFilterChain内部关联一个IoSession,用EntryImp来包装过滤器,过滤链中用HashMap<String,EntryImpl>来存放过滤器Entry,key为过滤器名,value为过滤器Entry。
      EntryImpl是过滤器在过滤链上存在的形式,EntryImpl有一个前驱和一个后继,内部包裹一个过滤器 with name,及过滤器的后继过滤器NextFilter。后继过滤器NextFilter的传递IoHandler和IoSession事件的方法,主要是将事件转发给后继Entry对应的过滤器。过滤链头为HeadFilter,链尾为TailFilter。
     HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;但对于IoSession write/close事件除了传递事件外,需要调用实际的事件操作doWrite/doClose,这两个方法需要子类扩展实现。
     TailFilter触发IoHandler和IoSession事件时,直接调用会话处理器IoHandler的相关事件方法。在sessionOpened事件中,最后如果是SocketConnector创建的会话,则要通知相关ConnectFuture;在sessionClosed事件中,最后还要清空过滤链;messageSent和messageReceived事件,如果消息对象为ByteBuffer,则释放buffer。
     添加过滤器到过滤链,首先检查过滤链上是否存在过滤器,不存在,才添加;
添加过滤器到头部即,插入过滤器到链头的后面,添加过滤器到尾部,即插入过滤器到链尾的前面;添加到指定过滤器前后,思路基本相同;添加前触发过滤器onPreAdd事件,添加后触发过滤器onPostAdd事件;移除过滤器,首先获取过滤器对应的Entry,然后触发过滤器onPreRemove事件,从过滤链name2entry移除Entry,然后触发过滤器onPostRemove事件。
     过滤链处理相关事件策略为:与IoHanler的相关事件(Session*)处理的顺序为,从链头到链尾-》Iohanlder(这个过程handler处理相关事件);对于会话相关的事件(FilterWrite/close),处理顺序为Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,关闭会话,handler并不处理会话事件) 。
今天我们来看具体的过滤链实现SocketFilterChain和DatagramFilterChain,在AbstractIoFilterChain
这篇文章中我们我们过滤链触发IoSession的write/close事件除了传递事件外,
调用实际的事件操作doWrite/doClose,这两个方法为抽象方法,需要子类扩展实现。
在SocketFilterChain和DatagramFilterChain具体中主要是doWrite/doClose这两个方法的具体实现。
先来看一下SocketFilterChain
package org.apache.mina.transport.socket.nio;

import java.io.IOException;

import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.AbstractIoFilterChain;
import org.apache.mina.util.Queue;

/**
 * An {@link IoFilterChain} for socket transport (TCP/IP).
 * 
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 */
class SocketFilterChain extends AbstractIoFilterChain {

    SocketFilterChain(IoSession parent) {
        super(parent);
    }

    protected void doWrite(IoSession session, WriteRequest writeRequest) {
        SocketSessionImpl s = (SocketSessionImpl) session;
	//获取Socket会话的的写请求队列,Queue继承于AbstractList,这个我们在后面再讲
        Queue writeRequestQueue = s.getWriteRequestQueue();

        // SocketIoProcessor.doFlush() will reset it after write is finished
        // because the buffer will be passed with messageSent event. 
	//这里之所以要mark buffer的位置,主要是buffer要传给messageSent事件,
	//待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置
        ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
        buffer.mark();
        int remaining = buffer.remaining();
        if (remaining == 0) {
	    //BaseIoSession
	    // private final AtomicInteger scheduledWriteRequests = new AtomicInteger();
            //更新调度请求计数器+1
            s.increaseScheduledWriteRequests();            
        } else {
	    //BaseIoSession
	    //private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
	    //更新调度写字节计数器+buffer.remaining()
            s.increaseScheduledWriteBytes(buffer.remaining());
        }

        synchronized (writeRequestQueue) {
	   //将写请求添加到session写请求队列中
            writeRequestQueue.push(writeRequest);
        }
        //如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作,这个在以后在具体详说
        if (session.getTrafficMask().isWritable()) {
            s.getIoProcessor().flush(s);
        }
    }
    //关闭会话
    protected void doClose(IoSession session) throws IOException {
        SocketSessionImpl s = (SocketSessionImpl) session;
        s.getIoProcessor().remove(s);//委托给session关联的IoProcessor
    }
}

来看SocketFilterChain实际会话关闭工作
//关闭会话
    protected void doClose(IoSession session) throws IOException {
        SocketSessionImpl s = (SocketSessionImpl) session;
        s.getIoProcessor().remove(s);
    }

//SocketIoProcessor
class SocketIoProcessor {
    ...
    private final Queue removingSessions = new Queue();//存放关闭的会话队列
     void remove(SocketSessionImpl session) throws IOException {
        scheduleRemove(session);//将会话添加到待移除会话队列
        startupWorker();//这一步我们在后面将SocketIoProcessor的时候再说
    }
    private void scheduleRemove(SocketSessionImpl session) {
        synchronized (removingSessions) {
            removingSessions.push(session);
        }
    }
   ...
}

小节:
SocketFilterChain发送消息首先获取Socket会话的的写请求队列;mark buffer的位置,
主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法
将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作。关闭session,即将会话添加到关联的IoProcessor待移除会话队列。

再来看一下DatagramSessionImpl:
package org.apache.mina.transport.socket.nio.support;

import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.AbstractIoFilterChain;
import org.apache.mina.util.Queue;

/**
 * An {@link IoFilterChain} for datagram transport (UDP/IP).
 * 
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 */
class DatagramFilterChain extends AbstractIoFilterChain {

    DatagramFilterChain(IoSession parent) {
        super(parent);
    }

    protected void doWrite(IoSession session, WriteRequest writeRequest) {
        DatagramSessionImpl s = (DatagramSessionImpl) session;
	//获取Socket会话的的写请求队列,Queue继承于AbstractList,这个我们在后面再讲
        Queue writeRequestQueue = s.getWriteRequestQueue();

        // SocketIoProcessor.doFlush() will reset it after write is finished
        // because the buffer will be passed with messageSent event. 
        //这里之所以要mark buffer的位置,主要是buffer要传给messageSent事件,
	//待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置
        ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
        buffer.mark();
        int remaining = buffer.remaining();
        if (remaining == 0) {
	    //BaseIoSession
	    // private final AtomicInteger scheduledWriteRequests = new AtomicInteger();
            //更新调度请求计数器+1
            s.increaseScheduledWriteRequests();            
        } else {
	     //BaseIoSession
	    //private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
	    //更新调度写字节计数器+buffer.remaining()
            s.increaseScheduledWriteBytes(buffer.remaining());
            s.increaseScheduledWriteBytes(buffer.remaining());
        }

        synchronized (writeRequestQueue) {
	    //将写请求添加到session写请求队列中
            writeRequestQueue.push(writeRequest);
        }
        
        if (session.getTrafficMask().isWritable()) {
	     //DatagramSessionImpl
	     //private final DatagramService managerDelegate;
	    //如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作,
	    //这个在以后在具体详说
            s.getManagerDelegate().flushSession(s);
        }
    }

    protected void doClose(IoSession session) {
        DatagramSessionImpl s = (DatagramSessionImpl) session;
        DatagramService manager = s.getManagerDelegate();
	////委托给session关联的managerDelegate(DatagramService)关闭会话
        if (manager instanceof DatagramConnectorDelegate) {
	    //如果是DatagramConnectorDelegate者直接关闭会话,则在后面具体再看
            ((DatagramConnectorDelegate) manager).closeSession(s);
        } else {
	    //通知DatagramAcceptorDelegate的监听器会话已关闭
            ((DatagramAcceptorDelegate) manager).getListeners()
                    .fireSessionDestroyed(session);
	    //设置会话CloseFuture为已关闭状态
            session.getCloseFuture().setClosed();
        }
    }
}

从上面DatagramSessionImpl关联的managerDelegate(DatagramService)两种分别为DatagramConnectorDelegate和DatagramAcceptorDelegate
//DatagramAcceptorDelegate
public class DatagramAcceptorDelegate extends BaseIoAcceptor implements
        IoAcceptor, DatagramService {

//DatagramConnectorDelegate
public class DatagramConnectorDelegate extends BaseIoConnector implements
        DatagramService {

今天这篇文章只是简单做一个简单的介绍,主要是对上一篇过滤链抽象实现的补充,文章中涉及的IoService和
IoProcessor我们还有讲到,后面讲到是在具体的说。
从上面可以看出,DatagramFilterChain发送消息首先获取报文会话的的写请求队列;mark buffer的位置,主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作。关闭会话委托给session关联的managerDelegate(DatagramService),如果managerDelegate为DatagramConnectorDelegate者直接关闭,如果为DatagramAcceptorDelegate,通知DatagramAcceptorDelegate的监听器会话已关闭,设置会话CloseFuture为已关闭状态。


总结:
     SocketFilterChain发送消息首先获取Socket会话的的写请求队列;mark buffer的位置,主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session运行写操作,获取session关联的IoProcessor完成实际的消息发送工作。关闭session,即将会话添加到关联的IoProcessor待移除会话队列。
      DatagramFilterChain发送消息首先获取报文会话的的写请求队列;mark buffer的位置,主要因为buffer要传给messageSent事件,待消息发送完成,SocketIoProcessor.doFlush方法将会reset buffer到当前mark的位置;根据buffer的实际数据容量来判断是更新调度请求计数器还是更新调度写字节计数器;将写请求添加到session写请求队列中,如果session允许写操作,获取session关联的managerDelegate(DatagramService)完成实际的消息发送工作。
关闭会话委托给session关联的managerDelegate(DatagramService),如果managerDelegate为DatagramConnectorDelegate者直接关闭,如果为DatagramAcceptorDelegate,通知DatagramAcceptorDelegate的监听器会话已关闭,设置会话CloseFuture为已关闭状态。
0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics