`
Donald_Draper
  • 浏览: 949414 次
社区版块
存档分类
最新评论

Mina 日志过滤器与引用计数过滤器

    博客分类:
  • Mina
阅读更多
MINA TCP简单通信实例:http://donald-draper.iteye.com/blog/2375297
Mina 过滤链默认构建器:http://donald-draper.iteye.com/blog/2375985
Mina 过滤器定义:http://donald-draper.iteye.com/blog/2376161
引言:
前面一篇文章我们看了一下过滤器定义,先来回顾一下:
     IoFilter添加到过滤链时,一般以ReferenceCountingIoFilter包装,添加到过滤链,init方法在添加到过滤链时,由ReferenceCountingIoFilter调用,所以可以在init方法初始化一些共享资源。如果过滤器没有包装成ReferenceCountingIoFilter,init方法将不会调用。然后调用onPreAdd通知过滤器将会添加到过滤连上,当过滤器添加到过滤链上时,所有IoHandler事件和IO请求,将会被过滤器拦截当过滤器添加到过滤链上后,将会调用onPostAdd,如果方法有异常,过滤器在过滤链的链尾,ReferenceCountingIoFilter将调用#destroy方法释放共享资源。过滤器IoFilter,主要是监听会话IoSession相关事件(创建,打开,空闲,异常,关闭,接受数据,发送数据) 及IoSesssion的Write与close事件;过滤器后继NextFilter关注的事件与过滤器IoFilter相同,主要是转发相关事件。WriteRequest是会话IoSession写操作write的包装,内部有一个消息对象用于存放write的内容,一个socket地址,即会话写请求的目的socket地址,一个写请求结果返回值WriteFuture,用于获取会话write消息的操作结果。当一个过滤器从过滤链上移除时,#onPreRemove被调用,用于通知过滤器将从过滤链上移除;如果过滤器从过滤链上移除,所有IoHandler事件和IO请求,过滤器不再拦截;#onPostRemove调用通知过滤器已经从过滤链上移除;移除后,如果过滤器在过滤链的链尾,ReferenceCountingIoFilter将调用#destroy方法释放共享资源。
        IoFilter生命周期如下:
inti->onPreAdd->onPostAdd->(拦截IoHandler相关事件:sessionCreated,Opened,Idle,
exceptionCaught,Closed,messageSent,messageReceived;会话相关事件:filterWrite,filterClose)->onPreRemove
->onPostRemove->destroy。
在TCP简单通信实例这篇文章中我们用到的Mina的日志过滤器LoggingFilter如下:
//配置过滤器
DefaultIoFilterChainBuilder defaultIoFilterChainBuilder = acceptor.getFilterChain();
LoggingFilter loggingFilter = new LoggingFilter();
defaultIoFilterChainBuilder.addLast("loggingFilter", loggingFilter);

今天我们来看一下LoggingFilter。
package org.apache.mina.filter;

import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.util.SessionLog;
import org.slf4j.Logger;

/**
 * Logs all MINA protocol events to {@link Logger}.
 * 
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 * 
 * @see SessionLog
 */
public class LoggingFilter extends IoFilterAdapter {
    /**
     * Session attribute key: prefix string
     */
    public static final String PREFIX = SessionLog.PREFIX;

    /**
     * Session attribute key: {@link Logger}
     */
    public static final String LOGGER = SessionLog.LOGGER;

    /**
     * Creates a new instance.
     */
    public LoggingFilter() {
    }
    //拦截IoHandler的sessionCreated事件,将日志输出委托给SessionLog,
    //然后将相关事件传递给过滤器后继,拦截sessionOpened,sessionClosed思路基本相同
    public void sessionCreated(NextFilter nextFilter, IoSession session) {
        SessionLog.info(session, "CREATED");
        nextFilter.sessionCreated(session);
    }
    public void sessionOpened(NextFilter nextFilter, IoSession session) {
        SessionLog.info(session, "OPENED");
        nextFilter.sessionOpened(session);
    }
    public void sessionClosed(NextFilter nextFilter, IoSession session) {
        SessionLog.info(session, "CLOSED");
        nextFilter.sessionClosed(session);
    }

    public void sessionIdle(NextFilter nextFilter, IoSession session,
            IdleStatus status) {
	//先判断会话日志Info级别是否开启
        if (SessionLog.isInfoEnabled(session)) {
            SessionLog.info(session, "IDLE: " + status);
        }
        nextFilter.sessionIdle(session, status);
    }

    public void exceptionCaught(NextFilter nextFilter, IoSession session,
            Throwable cause) {
        if (SessionLog.isWarnEnabled(session)) {
            SessionLog.warn(session, "EXCEPTION:", cause);
        }
        nextFilter.exceptionCaught(session, cause);
    }

    public void messageReceived(NextFilter nextFilter, IoSession session,
            Object message) {
        if (SessionLog.isInfoEnabled(session)) {
            SessionLog.info(session, "RECEIVED: " + message);
        }
        nextFilter.messageReceived(session, message);
    }

    public void messageSent(NextFilter nextFilter, IoSession session,
            Object message) {
        if (SessionLog.isInfoEnabled(session)) {
            SessionLog.info(session, "SENT: " + message);
        }
        nextFilter.messageSent(session, message);
    }

    public void filterWrite(NextFilter nextFilter, IoSession session,
            WriteRequest writeRequest) {
        if (SessionLog.isInfoEnabled(session)) {
            SessionLog.info(session, "WRITE: " + writeRequest);
        }
        nextFilter.filterWrite(session, writeRequest);
    }

    public void filterClose(NextFilter nextFilter, IoSession session)
            throws Exception {
        SessionLog.info(session, "CLOSE");
        nextFilter.filterClose(session);
    }
}

LoggingFilter拦截IoHandler的sessionCreated事件,将日志输出委托给SessionLog,
然后将相关事件传递给过滤器后继,拦截sessionOpened,sessionClosed,filterClose事件思路基本相同。对于sessionIdle,messageSent,messageReceived,filterWrite先判断会话log的info级别是否开启,开启则输出相应事件日志。上面这些事件的日志级别都是Info;exceptionCaught则先判断会话log的warn级别是否开启,开启则输出相应事件日志。

再来看SessionLog:
package org.apache.mina.util;

import org.apache.mina.common.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
//从包引入来看MINA默认用slf4j日志组件
/**
 * Provides utility methods to log protocol-specific messages.
 * <p>
 * Set {@link #PREFIX} and {@link #LOGGER} session attributes
 * to override prefix string and logger.
 *
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 *
 */
public class SessionLog {
    /**
     * Session attribute key: prefix string
     */
    public static final String PREFIX = SessionLog.class.getName() + ".prefix";

    /**
     * Session attribute key: {@link Logger}
     */
    public static final String LOGGER = SessionLog.class.getName() + ".logger";
    //获取会话IoHandler类型
    private static Class getClass(IoSession session) {
        return session.getHandler().getClass();
    }
    //Debug输出,如果会话日志开启debug级别,则输出debug日志
    public static void debug(IoSession session, String message) {
        Logger log = getLogger(session);
        if (log.isDebugEnabled()) {
            log.debug(String.valueOf(session.getAttribute(PREFIX)) + message);
        }
    }
    public static void debug(IoSession session, String message, Throwable cause) {
        Logger log = getLogger(session);
        if (log.isDebugEnabled()) {
            log.debug(String.valueOf(session.getAttribute(PREFIX)) + message,
                    cause);
        }
    }
    //获取会话日志
      private static Logger getLogger(IoSession session) {
        Logger log = (Logger) session.getAttribute(LOGGER);
        if (log == null) {
	    //如果会话日志为空,则从LoggerFactory获取Logger
            log = LoggerFactory.getLogger(getClass(session));
            String prefix = (String) session.getAttribute(PREFIX);
            if (prefix == null) {
                prefix = "[" + session.getRemoteAddress() + "] ";
                session.setAttribute(PREFIX, prefix);
            }
            //将日志log,添加到会话中
            session.setAttribute(LOGGER, log);
        }

        return log;
    }
    //判断log的debug级别是否开启
     public static boolean isDebugEnabled(IoSession session) {
        return getLogger(session).isDebugEnabled();
    }
    /*下面的info,warn,error与debug的思想是一致的这里不再说
    */
    public static void info(IoSession session, String message) {
        Logger log = getLogger(session);
        if (log.isInfoEnabled()) {
            log.info(String.valueOf(session.getAttribute(PREFIX)) + message);
        }
    }
    public static void info(IoSession session, String message, Throwable cause) {
        Logger log = getLogger(session);
        if (log.isInfoEnabled()) {
            log.info(String.valueOf(session.getAttribute(PREFIX)) + message,
                    cause);
        }
    }
    public static void warn(IoSession session, String message) {
        Logger log = getLogger(session);
        if (log.isWarnEnabled()) {
            log.warn(String.valueOf(session.getAttribute(PREFIX)) + message);
        }
    }
    public static void warn(IoSession session, String message, Throwable cause) {
        Logger log = getLogger(session);
        if (log.isWarnEnabled()) {
            log.warn(String.valueOf(session.getAttribute(PREFIX)) + message,
                    cause);
        }
    }
    public static void error(IoSession session, String message) {
        Logger log = getLogger(session);
        if (log.isErrorEnabled()) {
            log.error(String.valueOf(session.getAttribute(PREFIX)) + message);
        }
    }
    public static void error(IoSession session, String message, Throwable cause) {
        Logger log = getLogger(session);
        if (log.isErrorEnabled()) {
            log.error(String.valueOf(session.getAttribute(PREFIX)) + message,
                    cause);
        }
    }
    public static boolean isInfoEnabled(IoSession session) {
        return getLogger(session).isInfoEnabled();
    }
    public static boolean isWarnEnabled(IoSession session) {
        return getLogger(session).isWarnEnabled();
    }
    public static boolean isErrorEnabled(IoSession session) {
        return getLogger(session).isErrorEnabled();
    }
}

在TCP简单通信实例这篇文章中我们创建过一个测试的过滤器TestFilter,通过ReferenceCountingFilter包装添加
过滤链上:
TestFilter testFilter = new TestFilter(); 
ReferenceCountingFilter referenceCountingFilter = new ReferenceCountingFilter(testFilter);
defaultIoFilterChainBuilder.addLast("testFilter",referenceCountingFilter);

在前面过滤器的文章我们也提到过ReferenceCountingFilter,下面我们再来看一下ReferenceCountingFilter
package org.apache.mina.filter;

import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoFilter;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;

/**
 * An {@link IoFilter}s wrapper that keeps track of the number of usages of this filter and will call init/destroy
 * when the filter is not in use.
 *ReferenceCountingIoFilter用于包装过滤器,用于记录过滤器的使用量,
 当过滤器添加到过滤链上时,调用过滤器的init方法,从过滤链上移除时,调用过滤器的destroy方法。
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public class ReferenceCountingIoFilter implements IoFilter {
    private final IoFilter filter;//包装的过滤器

    private int count = 0;//过滤器使用计数器

    public ReferenceCountingIoFilter(IoFilter filter) {
        this.filter = filter;
    }
    public void init() throws Exception {
        // no-op, will init on-demand in pre-add if count == 0
    }
    public void destroy() throws Exception {
        //no-op, will destroy on-demand in post-remove if count == 0
    }
    public void onPostAdd(IoFilterChain parent, String name,
            NextFilter nextFilter) throws Exception {
        filter.onPostAdd(parent, name, nextFilter);
    }
    public synchronized void onPreAdd(IoFilterChain parent, String name,
            NextFilter nextFilter) throws Exception {
        if (0 == count) {
	    //当过滤器首次添加到过滤链上时,调用过滤器的init方法。
            filter.init();
            ++count;
        }

        filter.onPreAdd(parent, name, nextFilter);
    }
    public void onPreRemove(IoFilterChain parent, String name,
            NextFilter nextFilter) throws Exception {
        filter.onPreRemove(parent, name, nextFilter);
    }
    public synchronized void onPostRemove(IoFilterChain parent, String name,
            NextFilter nextFilter) throws Exception {
        filter.onPostRemove(parent, name, nextFilter);
        --count;
	//当过滤器从过滤链上完全移除时,调用过滤器的destroy方法。
        if (0 == count) {
            filter.destroy();
        }
    }
    public void exceptionCaught(NextFilter nextFilter, IoSession session,
            Throwable cause) throws Exception {
        filter.exceptionCaught(nextFilter, session, cause);
    }
    public void filterClose(NextFilter nextFilter, IoSession session)
            throws Exception {
        filter.filterClose(nextFilter, session);
    }

    public void filterWrite(NextFilter nextFilter, IoSession session,
            WriteRequest writeRequest) throws Exception {
        filter.filterWrite(nextFilter, session, writeRequest);
    }

    public void messageReceived(NextFilter nextFilter, IoSession session,
            Object message) throws Exception {
        filter.messageReceived(nextFilter, session, message);
    }

    public void messageSent(NextFilter nextFilter, IoSession session,
            Object message) throws Exception {
        filter.messageSent(nextFilter, session, message);
    }
    public void sessionClosed(NextFilter nextFilter, IoSession session)
            throws Exception {
        filter.sessionClosed(nextFilter, session);
    }
    public void sessionCreated(NextFilter nextFilter, IoSession session)
            throws Exception {
        filter.sessionCreated(nextFilter, session);
    }
    public void sessionIdle(NextFilter nextFilter, IoSession session,
            IdleStatus status) throws Exception {
        filter.sessionIdle(nextFilter, session, status);
    }
    public void sessionOpened(NextFilter nextFilter, IoSession session)
            throws Exception {
        filter.sessionOpened(nextFilter, session);
    }
}

一个过滤器可以多次添加到过滤链上,如何保证过滤器第一次添加到过滤链上时,初始化过滤器,完全从过滤链上移除时,销毁过滤器,释放资源?这就是ReferenceCountingIoFilter的作用,ReferenceCountingIoFilter同时也是一个过滤器,内部一个count用于记录过滤器filter添加到过滤链上的次数,即过滤链上存在filter的个数,一个filter,即ReferenceCountingIoFilter包装的过滤器,触发IoHandler和IoSession的相关事件,直接交给包装的内部filter处理。


总结:
     LoggingFilter拦截IoHandler的sessionCreated事件,将日志输出委托给SessionLog,然后将相关事件传递给过滤器后继,拦截sessionOpened,sessionClosed,
filterClose事件思路基本相同。对于sessionIdle,messageSent,messageReceived,
filterWrite先判断会话log的info级别是否开启,开启则输出相应事件日志。上面这些事件的日志级别都是Info;exceptionCaught则先判断会话log的warn级别是否开启,开启则输出相应
事件日志。
     一个过滤器可以多次添加到过滤链上,ReferenceCountingIoFilter用于保证过滤器第一次添加到过滤链上时,初始化过滤器,完全从过滤链上移除时,销毁过滤器,释放资源;ReferenceCountingIoFilter内部一个count用于记录包装过滤器filter添加到过滤链上的次数,即过滤链上存在filter的个数,一个filter,即ReferenceCountingIoFilter包装的过滤器,ReferenceCountingIoFilter触发IoHandler和IoSession的相关事件,直接交给包装的内部filter处理。
0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics