`
Donald_Draper
  • 浏览: 950198 次
社区版块
存档分类
最新评论

Mina 协议编解码过滤器一(协议编解码工厂、协议编码器)

    博客分类:
  • Mina
阅读更多
MINA TCP简单通信实例:http://donald-draper.iteye.com/blog/2375297
MINA 编解码器实例:http://donald-draper.iteye.com/blog/2375317
MINA 多路分离解码器实例:http://donald-draper.iteye.com/blog/2375324
Mina Socket会话配置:http://donald-draper.iteye.com/blog/2375529
Mina 过滤链默认构建器:http://donald-draper.iteye.com/blog/2375985
Mina 过滤器定义:http://donald-draper.iteye.com/blog/2376161
Mina 日志过滤器与引用计数过滤器:http://donald-draper.iteye.com/blog/2376226
Mina 过滤链抽象实现:http://donald-draper.iteye.com/blog/2376335
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
引言:
上面几篇文章我们简单看了一下Socket会话配置,过滤器及过滤链;在TCP简单通信实例这篇文章中,有这么一段代码:
//配置过滤器
DefaultIoFilterChainBuilder defaultIoFilterChainBuilder = acceptor.getFilterChain();
LoggingFilter loggingFilter = new LoggingFilter();
defaultIoFilterChainBuilder.addLast("loggingFilter", loggingFilter);
TextLineCodecFactory textLineCodecFactory = 
        new TextLineCodecFactory(charset,LineDelimiter.WINDOWS.getValue(),
		 LineDelimiter.WINDOWS.getValue());
ProtocolCodecFilter protocolCodecFilter = new ProtocolCodecFilter(textLineCodecFactory);
defaultIoFilterChainBuilder.addLast("protocolCodecFilter",protocolCodecFilter);

前面我们看过日志过滤器,今天我们来看一下协议编解码过滤器ProtocolCodecFilter。

/**
 * An {@link IoFilter} which translates binary or protocol specific data into
 * message object and vice versa using {@link ProtocolCodecFactory},
 * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
 *
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public class ProtocolCodecFilter extends IoFilterAdapter
{
    private static final Logger LOGGER = LoggerFactory.getLogger(org/apache/mina/filter/codec/ProtocolCodecFilter);
    private static final Class EMPTY_PARAMS[] = new Class[0];
    private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
    //编码器属性key
    private static final AttributeKey ENCODER = 
         new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "encoder");
    //编码器输出属性key
    private static final AttributeKey ENCODER_OUT = 
         new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "encoderOut");
    //解码器属性key
    private static final AttributeKey DECODER = 
         new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "decoder");  
    //解码器输出属性key
    private static final AttributeKey DECODER_OUT = 
         new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "decoderOut");
    private final ProtocolCodecFactory factory;//编解码器工厂协议
}

//属性key
public final class AttributeKey
    implements Serializable
{
    private static final long serialVersionUID = -583377473376683096L;
    private final String name;
    public AttributeKey(Class source, String name)
    {
        this.name = (new StringBuilder()).append(source.getName()).append('.').append(name).append('@').append(Integer.toHexString(hashCode())).toString();
    }
    public String toString()
    {
        return name;
    }
    public int hashCode()
    {
        int h = 629 + (name != null ? name.hashCode() : 0);
        return h;
    }
    public boolean equals(Object obj)
    {
        if(this == obj)
            return true;
        if(!(obj instanceof AttributeKey))
        {
            return false;
        } else
        {
            AttributeKey other = (AttributeKey)obj;
            return name.equals(other.name);
        }
    }
}

在往下看之前,我们先来看一下协议编解码工厂ProtocolCodecFactory
/**
 * Provides {@link ProtocolEncoder} and {@link ProtocolDecoder} which translates
 * binary or protocol specific data into message object and vice versa.
 * <p>协议编解码工厂ProtocolCodecFactory提供协议编码器和解码器,解码器二进制数据或
 协议数据到消息对象;编码器反之。
 * Please refer to
 * [url=../../../../../xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html]<code>ReverserProtocolProvider</code>[/url]
 * example.
 *  
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public interface ProtocolCodecFactory {
    /**
     * Returns a new (or reusable) instance of {@link ProtocolEncoder} which
     * encodes message objects into binary or protocol-specific data.
     返回一个编码器实例,用于将消息对象编码成二进制或协议数据
     */
    ProtocolEncoder getEncoder() throws Exception;
    /**
     * Returns a new (or reusable) instance of {@link ProtocolDecoder} which
     * decodes binary or protocol-specific data into message objects.
     返回一个解码器实例,用于将二进制或协议数据解码成消息对象
     */
    ProtocolDecoder getDecoder() throws Exception;
}

从上可以看出,协议编解码工厂ProtocolCodecFactory提供协议编码器ProtocolEncoder和解码器ProtocolDecoder;编码器将消息对象编码成二进制或协议数据,解码器将二进制或协议数据解码成消息对象。

再来看编码器与解码器的定义,先来看编码器:
/**
 * Encodes higher-level message objects into binary or protocol-specific data.
 * MINA invokes {@link #encode(IoSession, Object, ProtocolEncoderOutput)}
 * method with message which is popped from the session write queue, and then
 * the encoder implementation puts encoded {@link ByteBuffer}s into
 * {@link ProtocolEncoderOutput} by calling
 * {@link ProtocolEncoderOutput#write(ByteBuffer)}.
 * <p>协议编码器用于编码上层的消息对象为二进制或协议数据。mina调用编码器的#encode
 方法将从会话写请求队列中pop的消息,然后调用ProtocolEncoderOutput#write
 方法将编码后的消息放在ByteBuffer。
 * Please refer to
 * [url=../../../../../xref-examples/org/apache/mina/examples/reverser/TextLineEncoder.html]<code>TextLineEncoder</code>[/url]
 * example. 
 * 
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public interface ProtocolEncoder {

    /**
     * Encodes higher-level message objects into binary or protocol-specific data.
     * MINA invokes {@link #encode(IoSession, Object, ProtocolEncoderOutput)}
     * method with message which is popped from the session write queue, and then
     * the encoder implementation puts encoded {@link ByteBuffer}s into
     * {@link ProtocolEncoderOutput}.
     * 协议编码器用于编码上层的消息对象为二进制或协议数据。mina调用编码器的#encode
      方法将从会话写请求队列中pop的消息,然后调用ProtocolEncoderOutput#write
     方法将编码后的消息放在ByteBuffer。
     * @throws Exception if the message violated protocol specification
     */
    void encode(IoSession session, Object message, ProtocolEncoderOutput out)
            throws Exception;

    /**
     * Releases all resources related with this encoder.
     * 释放编码器关联的所有资源
     * @throws Exception if failed to dispose all resources
     */
    void dispose(IoSession session) throws Exception;
}

//ProtocolEncoderAdapter
/**
 * An abstract {@link ProtocolEncoder} implementation for those who don't have any
 * resources to dispose.
 * ProtocolEncoderAdapter为协议编码的抽象实现,主要是对应没有任何资源要释放的协议编码。
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public abstract class ProtocolEncoderAdapter implements ProtocolEncoder {
    /**
     * Override this method dispose all resources related with this encoder.
     * The default implementation does nothing.
     重写dispose,释放相关资源,默认为does nothing
     */
    public void dispose(IoSession session) throws Exception {
    }
}

再来看协议编码输出ProtocolEncoderOutput
/**
 * Callback for {@link ProtocolEncoder} to generate encoded {@link ByteBuffer}s.
 * {@link ProtocolEncoder} must call {@link #write(ByteBuffer)} for each encoded
 * message.
 * 
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public interface ProtocolEncoderOutput {
    /**
     * Callback for {@link ProtocolEncoder} to generate encoded
     * {@link ByteBuffer}s. {@link ProtocolEncoder} must call
     * {@link #write(ByteBuffer)} for each encoded message.
     * 协议编码器回调此方法,编码消息到ByteBuffer
     * @param buf the buffer which contains encoded data
     */
    void write(ByteBuffer buf);

    /**
     * Merges all buffers you wrote via {@link #write(ByteBuffer)} into
     * one {@link ByteBuffer} and replaces the old fragmented ones with it.
     * This method is useful when you want to control the way MINA generates
     * network packets.
     合并所有通过#write(ByteBuffer)产生的字节buffer,replaces the old fragmented ones with it
     当你需要控制mina产生网络包的时候,此方非常有用。
     */
    void mergeAll();

    /**
     * Flushes all buffers you wrote via {@link #write(ByteBuffer)} to
     * the session.  This operation is asynchronous; please wait for
     * the returned {@link WriteFuture} if you want to wait for
     * the buffers flushed.
     * 刷新所有通过write(ByteBuffer)写到会话的字节buffer。此操作为异步;
     如果想等待buffer刷新完成,可以等待返回的结果WriteFuture
     * @return <tt>null</tt> if there is nothing to flush at all.
     */
    WriteFuture flush();
}

来看一下ProtocolEncoderOutput的简单实现
/**
 * A {@link ProtocolEncoderOutput} based on queue.
 *
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public abstract class SimpleProtocolEncoderOutput implements
        ProtocolEncoderOutput {
    private final Queue bufferQueue = new Queue();//buffer队列
    public SimpleProtocolEncoderOutput() {
    }
    public Queue getBufferQueue() {
        return bufferQueue;
    }
    //将buffer添加到buffer队列
    public void write(ByteBuffer buf) {
        bufferQueue.push(buf);
    }
    //合并
    public void mergeAll() {
        int sum = 0;
        final int size = bufferQueue.size();
        if (size < 2) {
            // no need to merge!,长度小2,不许要合并
            return;
        }
        // Get the size of merged BB,计算buffer队列中,所有buffer的实际数据容量
        for (int i = size - 1; i >= 0; i--) {
            sum += ((ByteBuffer) bufferQueue.get(i)).remaining();
        }
        // Allocate a new BB that will contain all fragments,创建一个容量为sum的字节buffer
        ByteBuffer newBuf = ByteBuffer.allocate(sum);
        // and merge all.
	//遍历buffer队列所有buffer,放在newBuf中,并释放原始buffer
        for (;;) {
            ByteBuffer buf = (ByteBuffer) bufferQueue.pop();
            if (buf == null) {
                break;
            }
            newBuf.put(buf);
            buf.release();
        }
        // Push the new buffer finally.读写模式切换
        newBuf.flip();
        bufferQueue.push(newBuf);//将合并后的buffer添加到buffer队列
    }
    //刷新buffer队列数据
    public WriteFuture flush() {
        Queue bufferQueue = this.bufferQueue;
        WriteFuture future = null;
        if (bufferQueue.isEmpty()) {
            return null;
        } else {
	    遍历buffer队列所有buffer,发送buffer数据
            for (;;) {
                ByteBuffer buf = (ByteBuffer) bufferQueue.pop();
                if (buf == null) {
                    break;
                }

                // Flush only when the buffer has remaining.
                if (buf.hasRemaining()) {
		    //委托给doFlush
                    future = doFlush(buf);
                }
            }
        }
        return future;
    }
    //待子类扩展
    protected abstract WriteFuture doFlush(ByteBuffer buf);
}


从上面可以看出编码器ProtocolEncoder主要有两个方法,encode和dispose;
encode用于,编码上层的消息对象为二进制或协议数据。mina调用编码器的#encode
方法将从会话写请求队列中pop的消息,然后调用ProtocolEncoderOutput#write方法将编码后的消息放在ByteBuffer;dispose方法释放编码器资源。ProtocolEncoderAdapter为编码器抽象实现,默认实现了dispose,不做任何事情,对于不需要释放资源的编码器继承ProtocolEncoderAdapter。ProtocolEncoderOutput主要的工作是将协议编码器编码后的
字节buffer,缓存起来,等待flush方法调用时,则将数据发送出去。SimpleProtocolEncoderOutput为ProtocolEncoderOutput的简单实现内部有一个buffer队列bufferQueue(Queue),用于存放write(ByteBuffer)方法,传入的字节buffer;mergeAll方法为合并buffer队列的所有buffer数据到一个buffer;flush方法为发送buffer队列中的所有buffer,实际发送工作委托给doFlush方法待子类实现。

来看协议编解码过滤器ProtocolCodecFilter的ProtocolEncoderOutput的内部实现:
private static class ProtocolEncoderOutputImpl extends
            SimpleProtocolEncoderOutput {
        private final IoSession session;

        private final NextFilter nextFilter;

        private final WriteRequest writeRequest;

        public ProtocolEncoderOutputImpl(IoSession session,
                NextFilter nextFilter, WriteRequest writeRequest) {
            this.session = session;
            this.nextFilter = nextFilter;
            this.writeRequest = writeRequest;
        }
       //doFlush
        protected WriteFuture doFlush(ByteBuffer buf) {
	   //创建写操作返回结果
            WriteFuture future = new DefaultWriteFuture(session);
	    //将会话写事件filterWrite传递给下一个过滤器
            nextFilter.filterWrite(session, new WriteRequest(
                    new HiddenByteBuffer(buf), future, writeRequest
                            .getDestination()));
            return future;
        }
    }

ProtocolEncoderOutputImpl的doFlush,首先将会话包装成DefaultWriteFuture,
将会话,写请求信息传递给NextFilter
//ProtocolCodecFilter
private static class HiddenByteBuffer extends ByteBufferProxy {
    private HiddenByteBuffer(ByteBuffer buf) {
        super(buf);
    }
}

/**
 * A {@link ByteBuffer} that wraps a buffer and proxies any operations to it.
 * <p>ByteBufferProxy可以理解为字节buffer的静态代理,所有的方法都是委托给内部的字节buf。
 * You can think this class like a {@link FilterOutputStream}.  All operations
 * are proxied by default so that you can extend this class and override existing
 * operations selectively.  You can introduce new operations, too.
 * 这个有点像FilterOutputStream,所有的代理操作默认都是通过内部buf完成,
 亦可以选择重新一些方法
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public class ByteBufferProxy extends ByteBuffer {
    /**
     * The buffer proxied by this proxy.
     */
    protected ByteBuffer buf;
    /**
     * Create a new instance.
     * @param buf the buffer to be proxied
     */
    protected ByteBufferProxy(ByteBuffer buf) {
        if (buf == null) {
            throw new NullPointerException("buf");
        }
        this.buf = buf;
    }
    public void acquire() {
        buf.acquire();
    }
    public void release() {
        buf.release();
    }
    public ByteBuffer flip() {
        buf.flip();
        return this;
    }
    ...
    从上面看所有方法都是委托给内部buf
}

这一篇,我们先看到这,自此我们了协议编解码过滤器的内部协议编解码工厂ProtocolCodecFactory,及内部的ProtocolEncoderOutputImpl,先来总结一下,至于协议编解码工厂在协议编解码过滤器ProtocolCodecFilter中的作用我们在后面的文章再说。
总结:

   协议编解码过滤器ProtocolCodecFilter关联一个协议编解码工厂ProtocolCodecFactory,协议编解码工厂提供协议编码器ProtocolEncoder和解码器ProtocolDecoder;编码器将消息对象编码成二进制或协议数据,解码器将二进制或协议数据解码成消息对象。编码器ProtocolEncoder主要有两个方法,encode和dispose;encode用于,编码上层的消息对象为二进制或协议数据。mina调用编码器的#encode方法将从会话写请求队列中pop的消息,然后调用ProtocolEncoderOutput#write方法将编码后的消息放在ByteBuffer;dispose方法释放编码器资源。ProtocolEncoderAdapter为编码器抽象实现,默认实现了dispose,不做任何事情,对于不需要释放资源的编码器继承ProtocolEncoderAdapter。ProtocolEncoderOutput主要的工作是将协议编码器编码后的字节buffer,缓存起来,等待flush方法调用时,则将数据发送出去。SimpleProtocolEncoderOutput为ProtocolEncoderOutput的简单实现内部有一个buffer队列bufferQueue(Queue),用于存放write(ByteBuffer)方法,传入的字节buffer;
mergeAll方法为合并buffer队列的所有buffer数据到一个buffer;flush方法为发送buffer队列中的所有buffer,实际发送工作委托给doFlush方法待子类实现。ProtocolEncoderOutputImpl为协议编解码过滤器的内部类,ProtocolEncoderOutputImpl的doFlush,首先将会话包装成DefaultWriteFuture,将会话,写请求信息传递给NextFilter。

附:
/**
 * A default implementation of {@link WriteFuture}.
 * 
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public class DefaultWriteFuture extends DefaultIoFuture implements WriteFuture 




DefaultIoFuture:



  • 大小: 20.6 KB
  • 大小: 51.6 KB
0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics